Skip to main content

br_db/types/
pgsql.rs

1use crate::config::Connection;
2use crate::TABLE_FIELDS;
3use crate::pools;
4use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
5use crate::types::{DbMode, Mode, Params, TableOptions};
6use br_pgsql::pools::Pools;
7use br_pgsql::PgsqlError;
8use chrono::Local;
9use json::{array, object, JsonValue};
10use log::{error, info, warn};
11use std::thread;
12#[derive(Clone)]
13pub struct Pgsql {
14    /// 当前连接配置
15    pub connection: Connection,
16    /// 当前选中配置
17    pub default: String,
18    pub params: Params,
19    pub client: Pools,
20}
21
22impl Pgsql {
23    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
24        let port = connection
25            .hostport
26            .parse::<i32>()
27            .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
28
29        let cp_connection = connection.clone();
30        let config = object! {
31            debug: cp_connection.debug,
32            username: cp_connection.username,
33            userpass: cp_connection.userpass,
34            database: cp_connection.database,
35            hostname: cp_connection.hostname,
36            hostport: port,
37            charset: cp_connection.charset.str(),
38        };
39        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
40
41        let pools = pgsql.pools()?;
42        Ok(Self {
43            connection,
44            default: default.clone(),
45            params: Params::default("pgsql"),
46            client: pools,
47        })
48    }
49
50    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
51        let thread_id = format!("{:?}", thread::current().id());
52        let key = format!("{}{}", self.default, thread_id);
53
54        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
55            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
56
57            match result {
58                Some(Ok(e)) => {
59                    if self.connection.debug {
60                        info!("查询成功: {} {}", thread_id.clone(), sql);
61                    }
62                    (true, e.rows)
63                }
64                Some(Err(e)) => {
65                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
66                    (false, JsonValue::from(e.to_string()))
67                }
68                None => {
69                    error!("事务查询失败: 未找到事务连接 {thread_id}");
70                    (false, JsonValue::from("未找到事务连接"))
71                }
72            }
73        } else {
74            let mut guard = match self.client.get_guard() {
75                Ok(g) => g,
76                Err(e) => {
77                    // 连接池层已内部重试,此处快速失败(与 MySQL try_get_conn 行为一致)
78                    error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
79                    return (false, JsonValue::from(e.to_string()));
80                }
81            };
82            match guard.conn().query(sql) {
83                Ok(e) => {
84                    if self.connection.debug {
85                        info!("查询成功: {} {}", thread_id.clone(), sql);
86                    }
87                    (true, e.rows)
88                }
89                Err(ref e) if Self::is_retriable_error(e) => {
90                    // 查询执行时连接断开,丢弃坏连接后重试一次
91
92                    guard.discard();
93                    // failover 场景:池里其他连接可能也已死亡,清空空闲连接强制走 Create 路径
94                    self.client.flush_idle();
95                    warn!("非事务查询连接断开(重试一次): {thread_id} {e}");
96                    thread::sleep(std::time::Duration::from_millis(200));
97                    let mut guard2 = match self.client.get_guard() {
98                        Ok(g) => g,
99                        Err(e) => {
100                            error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
101                            return (false, JsonValue::from(e.to_string()));
102                        }
103                    };
104                    match guard2.conn().query(sql) {
105                        Ok(e) => {
106                            if self.connection.debug {
107                                info!("查询成功(重试): {} {}", thread_id.clone(), sql);
108                            }
109                            (true, e.rows)
110                        }
111                        Err(e) => {
112                            error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
113                            (false, JsonValue::from(e.to_string()))
114                        }
115                    }
116                }
117                Err(e) => {
118                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
119                    (false, JsonValue::from(e.to_string()))
120                }
121            }
122        }
123    }
124    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
125        let thread_id = format!("{:?}", thread::current().id());
126        let key = format!("{}{}", self.default, thread_id);
127
128        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
129            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
130
131            match result {
132                Some(Ok(e)) => {
133                    if self.connection.debug {
134                        info!("提交成功: {} {}", thread_id.clone(), sql);
135                    }
136                    if sql.contains("INSERT") {
137                        (true, e.rows)
138                    } else {
139                        (true, e.affect_count.into())
140                    }
141                }
142                Some(Err(e)) => {
143                    error!("事务提交失败: {thread_id} {e}");
144                    (false, JsonValue::from(e.to_string()))
145                }
146                None => {
147                    error!("事务执行失败: 未找到事务连接 {thread_id}");
148                    (false, JsonValue::from("未找到事务连接"))
149                }
150            }
151        } else {
152            // 连接池层已内部重试,此处快速失败(与 MySQL try_get_conn 行为一致)
153            let mut guard = match self.client.get_guard() {
154                Ok(g) => g,
155                Err(e) => {
156                    error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
157                    return (false, JsonValue::from(e.to_string()));
158                }
159            };
160            match guard.conn().execute(sql) {
161                Ok(e) => {
162                    if self.connection.debug {
163                        info!("提交成功: {} {}", thread_id.clone(), sql);
164                    }
165                    if sql.contains("INSERT") {
166                        (true, e.rows)
167                    } else {
168                        (true, e.affect_count.into())
169                    }
170                }
171                Err(ref e) if Self::is_retriable_error(e) => {
172                    // 执行时连接断开,丢弃坏连接后重试一次
173
174                    guard.discard();
175                    // failover 场景:池里其他连接可能也已死亡,清空空闲连接强制走 Create 路径
176                    self.client.flush_idle();
177                    warn!("非事务执行连接断开(重试一次): {thread_id} {e}");
178                    thread::sleep(std::time::Duration::from_millis(200));
179                    let mut guard2 = match self.client.get_guard() {
180                        Ok(g) => g,
181                        Err(e) => {
182                            error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
183                            return (false, JsonValue::from(e.to_string()));
184                        }
185                    };
186                    match guard2.conn().execute(sql) {
187                        Ok(e) => {
188                            if self.connection.debug {
189                                info!("提交成功(重试): {} {}", thread_id.clone(), sql);
190                            }
191                            if sql.contains("INSERT") {
192                                (true, e.rows)
193                            } else {
194                                (true, e.affect_count.into())
195                            }
196                        }
197                        Err(e) => {
198                            error!("非事务执行重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
199                            (false, JsonValue::from(e.to_string()))
200                        }
201                    }
202                }
203                Err(e) => {
204                    error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
205                    (false, JsonValue::from(e.to_string()))
206                }
207            }
208        }
209    }
210
211    /// 判断是否为可重试的连接类错误(连接断开、IO错误、超时)
212    fn is_retriable_error(e: &PgsqlError) -> bool {
213        matches!(
214            e,
215            PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
216        )
217    }
218}
219
220impl DbMode for Pgsql {
221    fn database_tables(&mut self) -> JsonValue {
222        let sql = "SHOW TABLES".to_string();
223        match self.sql(sql.as_str()) {
224            Ok(e) => {
225                let mut list = vec![];
226                for item in e.members() {
227                    for (_, value) in item.entries() {
228                        list.push(value.clone());
229                    }
230                }
231                list.into()
232            }
233            Err(_) => {
234                array![]
235            }
236        }
237    }
238
239    fn database_create(&mut self, name: &str) -> bool {
240        let sql = format!("CREATE DATABASE {name}");
241
242        let (state, data) = self.execute(sql.as_str());
243        match state {
244            true => data.as_bool().unwrap_or(true),
245            false => {
246                error!("创建数据库失败: {data:?}");
247                false
248            }
249        }
250    }
251
252    fn truncate(&mut self, table: &str) -> bool {
253        let sql = format!("TRUNCATE TABLE {table}");
254        let (state, _) = self.execute(sql.as_str());
255        state
256    }
257}
258
259impl Mode for Pgsql {
260    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
261        let mut sql = String::new();
262        let mut comments = vec![];
263
264        if !options.table_unique.is_empty() {
265            let full_name = format!(
266                "{}_unique_{}",
267                options.table_name,
268                options.table_unique.join("_")
269            );
270            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
271            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
272            let unique = format!(
273                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
274                name,
275                options.table_name,
276                options.table_unique.join(",")
277            );
278            comments.push(unique);
279        }
280
281        for row in options.table_index.iter() {
282            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
283            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
284            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
285            let index = format!(
286                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
287                name,
288                options.table_name,
289                row.join(",")
290            );
291            comments.push(index);
292        }
293
294        for (name, field) in options.table_fields.entries_mut() {
295            field["table_name"] = options.table_name.clone().into();
296            let row = br_fields::field("pgsql", name, field.clone());
297            let (col_sql, meta) = if let Some(idx) = row.find("--") {
298                (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
299            } else {
300                (row.trim(), None)
301            };
302            if let Some(meta) = meta {
303                comments.push(format!(
304                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
305                    options.table_name, name, meta
306                ));
307            }
308            sql = format!("{} {},\r\n", sql, col_sql);
309        }
310
311        let primary_key = format!(
312            "CONSTRAINT {}_{} PRIMARY KEY ({})",
313            options.table_name, options.table_key, options.table_key
314        );
315        let sql = format!(
316            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
317            options.table_name,
318            sql.trim_end_matches(",\r\n"),
319            primary_key
320        );
321        comments.insert(0, sql);
322
323        for (_name, field) in options.table_fields.entries() {
324            let _ = field["mode"].as_str();
325        }
326
327        if self.params.sql {
328            let info = comments.join("\r\n");
329            return JsonValue::from(info);
330        }
331        for comment in comments {
332            let (state, _) = self.execute(comment.as_str());
333            match state {
334                true => {}
335                false => {
336                    return JsonValue::from(state);
337                }
338            }
339        }
340        JsonValue::from(true)
341    }
342
343    fn table_update(&mut self, options: TableOptions) -> JsonValue {
344        // table_update 前清除缓存,确保获取最新表结构
345        let cache_key = format!("{}{}", self.default, options.table_name);
346        let table_fields_guard = match TABLE_FIELDS.read() {
347            Ok(g) => g,
348            Err(e) => e.into_inner(),
349        };
350        if table_fields_guard.get(&cache_key).is_some() {
351            drop(table_fields_guard);
352            let mut table_fields_guard = match TABLE_FIELDS.write() {
353                Ok(g) => g,
354                Err(e) => e.into_inner(),
355            };
356            table_fields_guard.remove(&cache_key);
357        } else {
358            drop(table_fields_guard);
359        }
360        let fields_list = self.table_info(&options.table_name);
361        let mut put = vec![];
362        let mut add = vec![];
363        let mut del = vec![];
364        let mut comments = vec![];
365
366        for (key, _) in fields_list.entries() {
367            if options.table_fields[key].is_empty() {
368                del.push(key);
369            }
370        }
371        for (name, field) in options.table_fields.entries() {
372            if !fields_list[name].is_empty() {
373                let old_info = &fields_list[name];
374                let new_field_sql = br_fields::field("pgsql", name, field.clone());
375
376                let old_comment = old_info["comment"].as_str().unwrap_or("");
377                let old_type = old_info["type"].as_str().unwrap_or("");
378
379                let new_comment = if let Some(idx) = new_field_sql.find("--") {
380                    new_field_sql[idx + 2..].trim()
381                } else {
382                    ""
383                };
384
385                let comment_matches =
386                    if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
387                        let old_parts: Vec<&str> = old_comment.split('|').collect();
388                        let new_parts: Vec<&str> = new_comment.split('|').collect();
389                        if old_parts.len() >= 4 && new_parts.len() >= 4 {
390                            old_parts[..4] == new_parts[..4]
391                        } else {
392                            old_comment == new_comment
393                        }
394                    } else if !old_comment.is_empty() && !new_comment.is_empty() {
395                        let old_parts: Vec<&str> = old_comment.split('|').collect();
396                        let new_parts: Vec<&str> = new_comment.split('|').collect();
397                        if old_parts.len() >= 2
398                            && new_parts.len() >= 2
399                            && old_parts.len() == new_parts.len()
400                        {
401                            let old_filtered: Vec<&str> = old_parts
402                                .iter()
403                                .filter(|v| **v != "true" && **v != "false")
404                                .copied()
405                                .collect();
406                            let new_filtered: Vec<&str> = new_parts
407                                .iter()
408                                .filter(|v| **v != "true" && **v != "false")
409                                .copied()
410                                .collect();
411                            old_filtered == new_filtered
412                        } else {
413                            old_comment == new_comment
414                        }
415                    } else {
416                        old_comment == new_comment
417                    };
418
419                let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
420                let new_type = if sql_parts.len() > 1 {
421                    sql_parts[1].to_lowercase()
422                } else {
423                    String::new()
424                };
425
426                let type_matches = match old_type {
427                    "integer" => {
428                        new_type.contains("int")
429                            && !new_type.contains("bigint")
430                            && !new_type.contains("smallint")
431                    }
432                    "bigint" => new_type.contains("bigint"),
433                    "smallint" => new_type.contains("smallint"),
434                    "boolean" => new_type.contains("boolean"),
435                    "text" => new_type.contains("text"),
436                    "character varying" => {
437                        if !new_type.contains("varchar") {
438                            false
439                        } else {
440                            let old_len = old_info["max_length"].as_i64().unwrap_or(0);
441                            let new_len = new_type
442                                .trim_start_matches("varchar(")
443                                .trim_end_matches(')')
444                                .parse::<i64>()
445                                .unwrap_or(0);
446                            let matched = old_len == new_len || new_len == 0;
447                            if !matched {
448                                log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
449                            }
450                            old_len == new_len || new_len == 0
451                        }
452                    }
453                    "character" => new_type.contains("char") && !new_type.contains("varchar"),
454                    "numeric" => {
455                        if !(new_type.contains("numeric") || new_type.contains("decimal")) {
456                            false
457                        } else {
458                            let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
459                            let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
460                            let inner = new_type
461                                .replace("numeric(", "")
462                                .replace("decimal(", "")
463                                .replace(')', "");
464                            let parts: Vec<&str> = inner.split(',').collect();
465                            let new_prec = parts.first().and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
466                            let new_scale = parts.get(1).and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
467                            old_prec == new_prec && old_scale == new_scale
468                        }
469                    }
470                    "double precision" => {
471                        new_type.contains("double") || new_type.contains("float8")
472                    }
473                    "real" => new_type.contains("real") || new_type.contains("float4"),
474                    "timestamp without time zone" | "timestamp with time zone" => {
475                        new_type.contains("timestamp")
476                    }
477                    "date" => new_type.contains("date") && !new_type.contains("timestamp"),
478                    "time without time zone" | "time with time zone" => {
479                        new_type.contains("time") && !new_type.contains("timestamp")
480                    }
481                    "json" | "jsonb" => new_type.contains("json"),
482                    "uuid" => new_type.contains("uuid"),
483                    "bytea" => new_type.contains("bytea"),
484                    _ => old_type == new_type,
485                };
486
487                if type_matches && comment_matches {
488                    continue;
489                }
490
491                log::debug!(
492                    "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
493                    options.table_name,
494                    name,
495                    type_matches,
496                    old_type,
497                    new_type,
498                    comment_matches
499                );
500                put.push(name);
501            } else {
502                add.push(name);
503            }
504        }
505
506        for name in add.iter() {
507            let name = name.to_string();
508            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
509            let rows = row.split("--").collect::<Vec<&str>>();
510            comments.push(format!(
511                r#"ALTER TABLE "{}" add {};"#,
512                options.table_name,
513                rows[0].trim()
514            ));
515            if rows.len() > 1 {
516                comments.push(format!(
517                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
518                    options.table_name,
519                    name,
520                    rows[1].trim()
521                ));
522            }
523        }
524        for name in del.iter() {
525            comments.push(format!(
526                "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
527                options.table_name, name
528            ));
529        }
530        for name in put.iter() {
531            let name = name.to_string();
532            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
533            let rows = row.split("--").collect::<Vec<&str>>();
534
535            let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
536
537            if sql[1].contains("BOOLEAN") {
538                let text = format!(
539                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
540                    options.table_name, name
541                );
542                comments.push(text.clone());
543                let text = format!(
544                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
545                    options.table_name, name, sql[1]
546                );
547                comments.push(text.clone());
548            } else {
549                let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
550                let new_type_lower = sql[1].to_lowercase();
551                let is_date_to_numeric = (old_col_type == "date"
552                    || old_col_type.contains("timestamp"))
553                    && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
554                if is_date_to_numeric {
555                    comments.push(format!(
556                        "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
557                        options.table_name, name
558                    ));
559                    comments.push(format!(
560                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
561                        options.table_name, name, sql[1], name, name, name
562                    ));
563                } else {
564                    let text = format!(
565                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
566                        options.table_name, name, sql[1]
567                    );
568                    comments.push(text.clone());
569                }
570            };
571
572            if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
573                let default_value = rows[0][default_pos + 9..].trim();
574                if !default_value.is_empty() {
575                    comments.push(format!(
576                        "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
577                        options.table_name, name, default_value
578                    ));
579                }
580            }
581            // PostgreSQL 不使用 NOT NULL 约束,依赖默认值
582            // 如果现有字段有 NOT NULL 约束,移除它
583            let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
584                .as_str()
585                .unwrap_or("YES");
586            let old_is_required = old_is_nullable == "NO";
587
588            // 跳过主键字段,主键隐含 NOT NULL 约束
589            if old_is_required && name != options.table_key {
590                comments.push(format!(
591                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
592                    options.table_name, name
593                ));
594            }
595
596            if rows.len() > 1 {
597                comments.push(format!(
598                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
599                    options.table_name,
600                    name,
601                    rows[1].trim()
602                ));
603            }
604        }
605
606        let mut unique_new = vec![];
607        let mut index_new = vec![];
608        let mut primary_key = vec![];
609        let (_, index_list) = self.query(
610            format!(
611                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
612                options.table_name
613            )
614            .as_str(),
615        );
616        for item in index_list.members() {
617            let key_name = item["indexname"].as_str().unwrap_or("");
618            let indexdef = item["indexdef"].to_string();
619
620            if indexdef.contains(
621                format!(
622                    "CREATE UNIQUE INDEX {}_{} ON",
623                    options.table_name, options.table_key
624                )
625                .as_str(),
626            ) {
627                primary_key.push(key_name.to_string());
628                continue;
629            }
630            if indexdef.contains("CREATE UNIQUE INDEX") {
631                unique_new.push(key_name.to_string());
632                continue;
633            }
634            if indexdef.contains("CREATE INDEX") {
635                index_new.push(key_name.to_string());
636                continue;
637            }
638        }
639
640        if !options.table_unique.is_empty() {
641            let full_name = format!(
642                "{}_unique_{}",
643                options.table_name,
644                options.table_unique.join("_")
645            );
646            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
647            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
648            let unique = format!(
649                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
650                name,
651                options.table_name,
652                options.table_unique.join(",")
653            );
654            if !unique_new.contains(&name) {
655                comments.push(unique);
656            }
657            unique_new.retain(|x| *x != name);
658        }
659
660        for row in options.table_index.iter() {
661            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
662            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
663            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
664            let index = format!(
665                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
666                name,
667                options.table_name,
668                row.join(",")
669            );
670            if !index_new.contains(&name) {
671                comments.push(index);
672            }
673            index_new.retain(|x| *x != name);
674        }
675
676        for item in unique_new {
677            if item.ends_with("_pkey") {
678                continue;
679            }
680            if item.starts_with("unique_") {
681                comments.push(format!(
682                    "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
683                    options.table_name,
684                    item.clone()
685                ));
686            } else {
687                comments.push(format!("DROP INDEX {};\r\n", item.clone()));
688            }
689        }
690        for item in index_new {
691            if item.ends_with("_pkey") {
692                continue;
693            }
694            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
695        }
696
697        if self.params.sql {
698            return JsonValue::from(comments.join(""));
699        }
700
701        if comments.is_empty() {
702            return JsonValue::from(-1);
703        }
704
705        for item in comments.iter() {
706            let (state, res) = self.execute(item.as_str());
707            match state {
708                true => {}
709                false => {
710                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
711                    return JsonValue::from(0);
712                }
713            }
714        }
715        JsonValue::from(1)
716    }
717
718    fn table_info(&mut self, table: &str) -> JsonValue {
719        // 读缓存(与 MySQL 一致的 TABLE_FIELDS RwLock 缓存)
720        let cache_key = format!("{}{}", self.default, table);
721        let table_fields_guard = match TABLE_FIELDS.read() {
722            Ok(g) => g,
723            Err(e) => e.into_inner(),
724        };
725        if let Some(cached) = table_fields_guard.get(&cache_key) {
726            return cached.clone();
727        }
728        drop(table_fields_guard);
729        let sql = format!(
730            "SELECT  COL.COLUMN_NAME,
731    COL.DATA_TYPE,
732    COL.IS_NULLABLE,
733    COL.CHARACTER_MAXIMUM_LENGTH,
734    COL.NUMERIC_PRECISION,
735    COL.NUMERIC_SCALE,
736    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
737    LEFT JOIN
738    pg_catalog.pg_description DESCRIPTION
739    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
740    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
741        let (state, data) = self.query(sql.as_str());
742        let mut list = object! {};
743        if state {
744            for item in data.members() {
745                let mut row = object! {};
746                row["field"] = item["column_name"].clone();
747                row["comment"] = item["comment"].clone();
748                row["type"] = item["data_type"].clone();
749                row["is_nullable"] = item["is_nullable"].clone();
750                row["max_length"] = item["character_maximum_length"].clone();
751                row["numeric_precision"] = item["numeric_precision"].clone();
752                row["numeric_scale"] = item["numeric_scale"].clone();
753                if let Some(field_name) = row["field"].as_str() {
754                    list[field_name] = row.clone();
755                }
756            }
757            // 写入缓存
758            let mut table_fields_guard = match TABLE_FIELDS.write() {
759                Ok(g) => g,
760                Err(e) => e.into_inner(),
761            };
762            table_fields_guard.insert(cache_key, list.clone());
763            list
764        } else {
765            list
766        }
767    }
768
769    fn table_is_exist(&mut self, name: &str) -> bool {
770        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
771        let (state, data) = self.query(sql.as_str());
772        match state {
773            true => {
774                for item in data.members() {
775                    if item.has_key("exists") {
776                        return item["exists"].as_bool().unwrap_or(false);
777                    }
778                }
779                false
780            }
781            false => false,
782        }
783    }
784
785    fn table(&mut self, name: &str) -> &mut Pgsql {
786        self.params = Params::default(self.connection.mode.str().as_str());
787        let table_name = format!("{}{}", self.connection.prefix, name);
788        if !super::sql_safety::validate_table_name(&table_name) {
789            error!("Invalid table name: {}", name);
790        }
791        self.params.table = table_name.clone();
792        self.params.join_table = table_name;
793        self
794    }
795
796    fn change_table(&mut self, name: &str) -> &mut Self {
797        self.params.join_table = name.to_string();
798        self
799    }
800
801    fn autoinc(&mut self) -> &mut Self {
802        self.params.autoinc = true;
803        self
804    }
805
806    fn timestamps(&mut self) -> &mut Self {
807        self.params.timestamps = true;
808        self
809    }
810
811    fn fetch_sql(&mut self) -> &mut Self {
812        self.params.sql = true;
813        self
814    }
815
816    fn order(&mut self, field: &str, by: bool) -> &mut Self {
817        self.params.order[field] = {
818            if by {
819                "DESC"
820            } else {
821                "ASC"
822            }
823        }
824        .into();
825        self
826    }
827
828    fn group(&mut self, field: &str) -> &mut Self {
829        let fields: Vec<&str> = field.split(",").collect();
830        for field in fields.iter() {
831            let field = field.to_string();
832            self.params.group[field.as_str()] = field.clone().into();
833            self.params.fields[field.as_str()] = field.clone().into();
834        }
835        self
836    }
837
838    fn distinct(&mut self) -> &mut Self {
839        self.params.distinct = true;
840        self
841    }
842
843    fn json(&mut self, field: &str) -> &mut Self {
844        let list: Vec<&str> = field.split(",").collect();
845        for item in list.iter() {
846            self.params.json[item.to_string().as_str()] = item.to_string().into();
847        }
848        self
849    }
850
851    fn location(&mut self, field: &str) -> &mut Self {
852        let list: Vec<&str> = field.split(",").collect();
853        for item in list.iter() {
854            self.params.location[item.to_string().as_str()] = item.to_string().into();
855        }
856        self
857    }
858
859    fn field(&mut self, field: &str) -> &mut Self {
860        let list: Vec<&str> = field.split(",").collect();
861        let join_table = if self.params.join_table.is_empty() {
862            self.params.table.clone()
863        } else {
864            self.params.join_table.clone()
865        };
866        for item in list.iter() {
867            let lower = item.to_lowercase();
868            let is_expr = lower.contains("count(")
869                || lower.contains("sum(")
870                || lower.contains("avg(")
871                || lower.contains("max(")
872                || lower.contains("min(")
873                || lower.contains("case ");
874            if is_expr {
875                self.params.fields[item.to_string().as_str()] = (*item).into();
876            } else if item.contains(" as ") {
877                let text = item.split(" as ").collect::<Vec<&str>>();
878                self.params.fields[item.to_string().as_str()] =
879                    format!("{}.{} as {}", join_table, text[0], text[1]).into();
880            } else {
881                self.params.fields[item.to_string().as_str()] =
882                    format!("{join_table}.{item}").into();
883            }
884        }
885        self
886    }
887
888    fn field_raw(&mut self, expr: &str) -> &mut Self {
889        self.params.fields[expr] = expr.into();
890        self
891    }
892
893    fn hidden(&mut self, name: &str) -> &mut Self {
894        let hidden: Vec<&str> = name.split(",").collect();
895
896        let fields_list = self.table_info(self.params.clone().table.as_str());
897        let mut data = array![];
898        for item in fields_list.members() {
899            let _ = data.push(object! {
900                "name":item["field"].as_str().unwrap_or("")
901            });
902        }
903
904        for item in data.members() {
905            let name = item["name"].as_str().unwrap_or("");
906            if !hidden.contains(&name) {
907                self.params.fields[name] = name.into();
908            }
909        }
910        self
911    }
912
913    fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
914        for f in field.split('|') {
915            if !super::sql_safety::validate_field_name(f) {
916                error!("Invalid field name: {}", f);
917            }
918        }
919        if !super::sql_safety::validate_compare_orator(compare) {
920            error!("Invalid compare operator: {}", compare);
921        }
922        let join_table = if self.params.join_table.is_empty() {
923            self.params.table.clone()
924        } else {
925            self.params.join_table.clone()
926        };
927        if value.is_boolean() {
928            let bool_val = value.as_bool().unwrap_or(false);
929            self.params
930                .where_and
931                .push(format!("{join_table}.{field} {compare} {bool_val}"));
932            return self;
933        }
934        match compare {
935            "between" => {
936                self.params.where_and.push(format!(
937                    "{}.{} between '{}' AND '{}'",
938                    join_table, field, value[0], value[1]
939                ));
940            }
941            "set" => {
942                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
943                let mut wheredata = vec![];
944                for item in list.iter() {
945                    wheredata.push(format!(
946                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
947                    ));
948                }
949                self.params
950                    .where_and
951                    .push(format!("({})", wheredata.join(" or ")));
952            }
953            "notin" => {
954                let mut text = String::new();
955                for item in value.members() {
956                    text = format!("{text},'{item}'");
957                }
958                text = text.trim_start_matches(",").into();
959                self.params
960                    .where_and
961                    .push(format!("{join_table}.{field} not in ({text})"));
962            }
963            "is" => {
964                self.params
965                    .where_and
966                    .push(format!("{join_table}.{field} is {value}"));
967            }
968            "isnot" => {
969                self.params
970                    .where_and
971                    .push(format!("{join_table}.{field} is not {value}"));
972            }
973            "notlike" => {
974                self.params
975                    .where_and
976                    .push(format!("{join_table}.{field} not like '{value}'"));
977            }
978            "in" => {
979                if value.is_array() && value.is_empty() {
980                    self.params.where_and.push("1=0".to_string());
981                    return self;
982                }
983                let mut text = String::new();
984                if value.is_array() {
985                    for item in value.members() {
986                        text = format!("{text},'{item}'");
987                    }
988                } else if value.is_null() {
989                    text = format!("{text},null");
990                } else {
991                    let value = value.as_str().unwrap_or("");
992
993                    let value: Vec<&str> = value.split(",").collect();
994                    for item in value.iter() {
995                        text = format!("{text},'{item}'");
996                    }
997                }
998                text = text.trim_start_matches(",").into();
999
1000                self.params
1001                    .where_and
1002                    .push(format!("{join_table}.{field} {compare} ({text})"));
1003            }
1004            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
1005            // 用法:.where_and("tags", "json_contains", "紧急".into())
1006            //       .where_and("tags", "json_contains", json::array!["紧急", "重要"])
1007            "json_contains" => {
1008                if value.is_array() {
1009                    if value.is_empty() {
1010                        self.params.where_and.push("1=0".to_string());
1011                    } else {
1012                        let mut parts = vec![];
1013                        for item in value.members() {
1014                            let escaped = super::sql_safety::escape_string(&item.to_string());
1015                            parts.push(format!(
1016                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1017                                escaped
1018                            ));
1019                        }
1020                        self.params
1021                            .where_and
1022                            .push(format!("({})", parts.join(" OR ")));
1023                    }
1024                } else {
1025                    let escaped = super::sql_safety::escape_string(&value.to_string());
1026                    self.params.where_and.push(format!(
1027                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1028                        escaped
1029                    ));
1030                }
1031            }
1032            _ => {
1033                self.params
1034                    .where_and
1035                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1036            }
1037        }
1038        self
1039    }
1040
1041    fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1042        for f in field.split('|') {
1043            if !super::sql_safety::validate_field_name(f) {
1044                error!("Invalid field name: {}", f);
1045            }
1046        }
1047        if !super::sql_safety::validate_compare_orator(compare) {
1048            error!("Invalid compare operator: {}", compare);
1049        }
1050        let join_table = if self.params.join_table.is_empty() {
1051            self.params.table.clone()
1052        } else {
1053            self.params.join_table.clone()
1054        };
1055
1056        if value.is_boolean() {
1057            let bool_val = value.as_bool().unwrap_or(false);
1058            self.params
1059                .where_or
1060                .push(format!("{join_table}.{field} {compare} {bool_val}"));
1061            return self;
1062        }
1063
1064        match compare {
1065            "between" => {
1066                self.params.where_or.push(format!(
1067                    "{}.{} between '{}' AND '{}'",
1068                    join_table, field, value[0], value[1]
1069                ));
1070            }
1071            "set" => {
1072                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1073                let mut wheredata = vec![];
1074                for item in list.iter() {
1075                    wheredata.push(format!(
1076                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1077                    ));
1078                }
1079                self.params
1080                    .where_or
1081                    .push(format!("({})", wheredata.join(" or ")));
1082            }
1083            "notin" => {
1084                let mut text = String::new();
1085                for item in value.members() {
1086                    text = format!("{text},'{item}'");
1087                }
1088                text = text.trim_start_matches(",").into();
1089                self.params
1090                    .where_or
1091                    .push(format!("{join_table}.{field} not in ({text})"));
1092            }
1093            "is" => {
1094                self.params
1095                    .where_or
1096                    .push(format!("{join_table}.{field} is {value}"));
1097            }
1098            "isnot" => {
1099                self.params
1100                    .where_or
1101                    .push(format!("{join_table}.{field} is not {value}"));
1102            }
1103            "in" => {
1104                if value.is_array() && value.is_empty() {
1105                    self.params.where_or.push("1=0".to_string());
1106                    return self;
1107                }
1108                let mut text = String::new();
1109                if value.is_array() {
1110                    for item in value.members() {
1111                        text = format!("{text},'{item}'");
1112                    }
1113                } else {
1114                    let value = value.as_str().unwrap_or("");
1115                    let value: Vec<&str> = value.split(",").collect();
1116                    for item in value.iter() {
1117                        text = format!("{text},'{item}'");
1118                    }
1119                }
1120                text = text.trim_start_matches(",").into();
1121                self.params
1122                    .where_or
1123                    .push(format!("{join_table}.{field} {compare} ({text})"));
1124            }
1125            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
1126            // 用法:.where_or("tags", "json_contains", "紧急".into())
1127            //       .where_or("tags", "json_contains", json::array!["紧急", "重要"])
1128            "json_contains" => {
1129                if value.is_array() {
1130                    if value.is_empty() {
1131                        self.params.where_or.push("1=0".to_string());
1132                    } else {
1133                        let mut parts = vec![];
1134                        for item in value.members() {
1135                            let escaped = super::sql_safety::escape_string(&item.to_string());
1136                            parts.push(format!(
1137                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1138                                escaped
1139                            ));
1140                        }
1141                        self.params
1142                            .where_or
1143                            .push(format!("({})", parts.join(" OR ")));
1144                    }
1145                } else {
1146                    let escaped = super::sql_safety::escape_string(&value.to_string());
1147                    self.params.where_or.push(format!(
1148                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1149                        escaped
1150                    ));
1151                }
1152            }
1153            _ => {
1154                self.params
1155                    .where_or
1156                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1157            }
1158        }
1159        self
1160    }
1161
1162    fn where_raw(&mut self, expr: &str) -> &mut Self {
1163        self.params.where_and.push(expr.to_string());
1164        self
1165    }
1166
1167    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1168        self.params
1169            .where_and
1170            .push(format!("\"{field}\" IN ({sub_sql})"));
1171        self
1172    }
1173
1174    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1175        self.params
1176            .where_and
1177            .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1178        self
1179    }
1180
1181    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1182        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1183        self
1184    }
1185
1186    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1187        self.params
1188            .where_and
1189            .push(format!("NOT EXISTS ({sub_sql})"));
1190        self
1191    }
1192
1193    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1194        self.params.where_column = format!(
1195            "{}.{} {} {}.{}",
1196            self.params.table, field_a, compare, self.params.table, field_b
1197        );
1198        self
1199    }
1200
1201    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1202        self.params
1203            .update_column
1204            .push(format!("{field_a} = {compare}"));
1205        self
1206    }
1207
1208    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1209        self.params.page = page;
1210        self.params.limit = limit;
1211        self
1212    }
1213
1214    fn limit(&mut self, count: i32) -> &mut Self {
1215        self.params.limit_only = count;
1216        self
1217    }
1218
1219    fn column(&mut self, field: &str) -> JsonValue {
1220        self.field(field);
1221        let sql = self.params.select_sql();
1222
1223        if self.params.sql {
1224            return JsonValue::from(sql);
1225        }
1226        let (state, data) = self.query(sql.as_str());
1227        match state {
1228            true => {
1229                let mut list = array![];
1230                for item in data.members() {
1231                    if self.params.json[field].is_empty() {
1232                        let _ = list.push(item[field].clone());
1233                    } else {
1234                        let data =
1235                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1236                        let _ = list.push(data);
1237                    }
1238                }
1239                list
1240            }
1241            false => {
1242                array![]
1243            }
1244        }
1245    }
1246
1247    fn count(&mut self) -> JsonValue {
1248        self.params.fields = json::object! {};
1249        self.params.fields["count"] = "count(*) as count".to_string().into();
1250        let sql = self.params.select_sql();
1251        if self.params.sql {
1252            return JsonValue::from(sql.clone());
1253        }
1254        let (state, data) = self.query(sql.as_str());
1255        if state {
1256            data[0]["count"].clone()
1257        } else {
1258            JsonValue::from(0)
1259        }
1260    }
1261
1262    fn max(&mut self, field: &str) -> JsonValue {
1263        self.params.fields[field] = format!("max({field}) as {field}").into();
1264        let sql = self.params.select_sql();
1265        if self.params.sql {
1266            return JsonValue::from(sql.clone());
1267        }
1268        let (state, data) = self.query(sql.as_str());
1269        if state {
1270            if data.len() > 1 {
1271                return data.clone();
1272            }
1273            data[0][field].clone()
1274        } else {
1275            JsonValue::from(0)
1276        }
1277    }
1278
1279    fn min(&mut self, field: &str) -> JsonValue {
1280        self.params.fields[field] = format!("min({field}) as {field}").into();
1281        let sql = self.params.select_sql();
1282        if self.params.sql {
1283            return JsonValue::from(sql.clone());
1284        }
1285        let (state, data) = self.query(sql.as_str());
1286        if state {
1287            if data.len() > 1 {
1288                return data;
1289            }
1290            data[0][field].clone()
1291        } else {
1292            JsonValue::from(0)
1293        }
1294    }
1295
1296    fn sum(&mut self, field: &str) -> JsonValue {
1297        self.params.fields[field] = format!("sum({field}) as {field}").into();
1298        let sql = self.params.select_sql();
1299        if self.params.sql {
1300            return JsonValue::from(sql.clone());
1301        }
1302        let (state, data) = self.query(sql.as_str());
1303        match state {
1304            true => {
1305                if data.len() > 1 {
1306                    return data;
1307                }
1308                data[0][field].clone()
1309            }
1310            false => JsonValue::from(0),
1311        }
1312    }
1313
1314    fn avg(&mut self, field: &str) -> JsonValue {
1315        self.params.fields[field] = format!("avg({field}) as {field}").into();
1316        let sql = self.params.select_sql();
1317        if self.params.sql {
1318            return JsonValue::from(sql.clone());
1319        }
1320        let (state, data) = self.query(sql.as_str());
1321        if state {
1322            if data.len() > 1 {
1323                return data;
1324            }
1325            data[0][field].clone()
1326        } else {
1327            JsonValue::from(0)
1328        }
1329    }
1330
1331    fn having(&mut self, expr: &str) -> &mut Self {
1332        self.params.having.push(expr.to_string());
1333        self
1334    }
1335
1336    fn select(&mut self) -> JsonValue {
1337        let sql = self.params.select_sql();
1338        if self.params.sql {
1339            return JsonValue::from(sql.clone());
1340        }
1341        let (state, mut data) = self.query(sql.as_str());
1342        match state {
1343            true => {
1344                for (field, _) in self.params.json.entries() {
1345                    for item in data.members_mut() {
1346                        if !item[field].is_empty() {
1347                            let json = item[field].to_string();
1348                            item[field] = match json::parse(&json) {
1349                                Ok(e) => e,
1350                                Err(_) => JsonValue::from(json),
1351                            };
1352                        }
1353                    }
1354                }
1355                data.clone()
1356            }
1357            false => array![],
1358        }
1359    }
1360
1361    fn find(&mut self) -> JsonValue {
1362        self.params.page = 1;
1363        self.params.limit = 1;
1364        let sql = self.params.select_sql();
1365        if self.params.sql {
1366            return JsonValue::from(sql.clone());
1367        }
1368        let (state, mut data) = self.query(sql.as_str());
1369        match state {
1370            true => {
1371                if data.is_empty() {
1372                    return object! {};
1373                }
1374                for (field, _) in self.params.json.entries() {
1375                    if !data[0][field].is_empty() {
1376                        let json = data[0][field].to_string();
1377                        let json = json::parse(&json).unwrap_or(array![]);
1378                        data[0][field] = json;
1379                    } else {
1380                        data[0][field] = array![];
1381                    }
1382                }
1383                data[0].clone()
1384            }
1385            false => {
1386                object! {}
1387            }
1388        }
1389    }
1390
1391    fn value(&mut self, field: &str) -> JsonValue {
1392        self.params.fields = object! {};
1393        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1394        self.params.page = 1;
1395        self.params.limit = 1;
1396        let sql = self.params.select_sql();
1397        if self.params.sql {
1398            return JsonValue::from(sql.clone());
1399        }
1400        let (state, mut data) = self.query(sql.as_str());
1401        match state {
1402            true => {
1403                for (field, _) in self.params.json.entries() {
1404                    if !data[0][field].is_empty() {
1405                        let json = data[0][field].to_string();
1406                        let json = json::parse(&json).unwrap_or(array![]);
1407                        data[0][field] = json;
1408                    } else {
1409                        data[0][field] = array![];
1410                    }
1411                }
1412                data[0][field].clone()
1413            }
1414            false => {
1415                if self.connection.debug {
1416                    info!("{data:?}");
1417                }
1418                JsonValue::Null
1419            }
1420        }
1421    }
1422
1423    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1424        let fields_list = self.table_info(&self.params.table.clone());
1425        let mut fields = vec![];
1426        let mut values = vec![];
1427        if !self.params.autoinc && data["id"].is_empty() {
1428            let thread_id = format!("{:?}", std::thread::current().id());
1429            let thread_num: u64 = thread_id
1430                .trim_start_matches("ThreadId(")
1431                .trim_end_matches(")")
1432                .parse()
1433                .unwrap_or(0);
1434            data["id"] = format!(
1435                "{:X}{:X}",
1436                Local::now().timestamp_nanos_opt().unwrap_or(0),
1437                thread_num
1438            )
1439            .into();
1440        }
1441        for (field, value) in data.entries() {
1442            fields.push(format!("\"{}\"", field));
1443
1444            if value.is_string() {
1445                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1446                continue;
1447            } else if value.is_array() {
1448                if self.params.json[field].is_empty() {
1449                    let array = value
1450                        .members()
1451                        .map(|x| x.as_str().unwrap_or(""))
1452                        .collect::<Vec<&str>>()
1453                        .join(",");
1454                    values.push(format!("'{array}'"));
1455                } else {
1456                    let json = value.to_string();
1457                    let json = json.replace("'", "''");
1458                    values.push(format!("'{json}'"));
1459                }
1460                continue;
1461            } else if value.is_object() {
1462                if self.params.json[field].is_empty() {
1463                    values.push(format!("'{value}'"));
1464                } else {
1465                    let json = value.to_string();
1466                    let json = json.replace("'", "''");
1467                    values.push(format!("'{json}'"));
1468                }
1469                continue;
1470            } else if value.is_number() {
1471                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1472                if col_type == "boolean" {
1473                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1474                    values.push(format!("{bool_val}"));
1475                } else if col_type.contains("int") {
1476                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1477                } else {
1478                    values.push(format!("{value}"));
1479                }
1480                continue;
1481            } else if value.is_boolean() || value.is_null() {
1482                values.push(format!("{value}"));
1483                continue;
1484            } else {
1485                values.push(format!("'{value}'"));
1486                continue;
1487            }
1488        }
1489        let fields = fields.join(",");
1490        let values = values.join(",");
1491
1492        let sql = format!(
1493            "INSERT INTO {} ({}) VALUES ({});",
1494            self.params.table, fields, values
1495        );
1496        if self.params.sql {
1497            return JsonValue::from(sql.clone());
1498        }
1499        let (state, ids) = self.execute(sql.as_str());
1500
1501        match state {
1502            true => match self.params.autoinc {
1503                true => ids.clone(),
1504                false => data["id"].clone(),
1505            },
1506            false => {
1507                let thread_id = format!("{:?}", thread::current().id());
1508                error!("添加失败: {thread_id} {ids:?} {sql}");
1509                JsonValue::from("")
1510            }
1511        }
1512    }
1513    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1514        let fields_list = self.table_info(&self.params.table.clone());
1515        let mut fields = String::new();
1516        if !self.params.autoinc && data[0]["id"].is_empty() {
1517            data[0]["id"] = "".into();
1518        }
1519        for (field, _) in data[0].entries() {
1520            fields = format!("{fields},\"{field}\"");
1521        }
1522        fields = fields.trim_start_matches(",").to_string();
1523
1524        let core_count = num_cpus::get();
1525        let mut p = pools::Pool::new(core_count * 4);
1526
1527        let autoinc = self.params.autoinc;
1528        for list in data.members() {
1529            let mut item = list.clone();
1530            let i = br_fields::str::Code::verification_code(3);
1531            let fields_list_new = fields_list.clone();
1532            p.execute(move |pcindex| {
1533                if !autoinc && item["id"].is_empty() {
1534                    let id = format!(
1535                        "{:X}{:X}{}",
1536                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1537                        pcindex,
1538                        i
1539                    );
1540                    item["id"] = id.into();
1541                }
1542                let mut values = "".to_string();
1543                for (field, value) in item.entries() {
1544                    if value.is_string() {
1545                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1546                    } else if value.is_number() {
1547                        let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1548                        if col_type == "boolean" {
1549                            let bool_val = value.as_i64().unwrap_or(0) != 0;
1550                            values = format!("{values},{bool_val}");
1551                        } else if col_type.contains("int") {
1552                            values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1553                        } else {
1554                            values = format!("{values},{value}");
1555                        }
1556                    } else if value.is_boolean() {
1557                        values = format!("{values},{value}");
1558                        continue;
1559                    } else {
1560                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1561                    }
1562                }
1563                values = format!("({})", values.trim_start_matches(","));
1564                array![item["id"].clone(), values]
1565            });
1566        }
1567        let (ids_list, mut values) = p.insert_all();
1568        values = values.trim_start_matches(",").to_string();
1569        let sql = format!(
1570            "INSERT INTO {} ({}) VALUES {};",
1571            self.params.table, fields, values
1572        );
1573
1574        if self.params.sql {
1575            return JsonValue::from(sql.clone());
1576        }
1577        let (state, data) = self.execute(sql.as_str());
1578        match state {
1579            true => match autoinc {
1580                true => data,
1581                false => JsonValue::from(ids_list),
1582            },
1583            false => {
1584                error!("insert_all: {data:?}");
1585                let _ = std::fs::write("pgsql_insert_all_error.log", format!("insert_all error:\n{:?}\n\nSQL:\n{}", data, sql));
1586                array![]
1587            }
1588        }
1589    }
1590    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1591        let fields_list = self.table_info(&self.params.table.clone());
1592        let mut fields = vec![];
1593        let mut values = vec![];
1594        if !self.params.autoinc && data["id"].is_empty() {
1595            let thread_id = format!("{:?}", std::thread::current().id());
1596            let thread_num: u64 = thread_id
1597                .trim_start_matches("ThreadId(")
1598                .trim_end_matches(")")
1599                .parse()
1600                .unwrap_or(0);
1601            data["id"] = format!(
1602                "{:X}{:X}",
1603                Local::now().timestamp_nanos_opt().unwrap_or(0),
1604                thread_num
1605            )
1606            .into();
1607        }
1608        for (field, value) in data.entries() {
1609            fields.push(format!("\"{}\"", field));
1610
1611            if value.is_string() {
1612                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1613                continue;
1614            } else if value.is_array() {
1615                if self.params.json[field].is_empty() {
1616                    let array = value
1617                        .members()
1618                        .map(|x| x.as_str().unwrap_or(""))
1619                        .collect::<Vec<&str>>()
1620                        .join(",");
1621                    values.push(format!("'{array}'"));
1622                } else {
1623                    let json = value.to_string();
1624                    let json = json.replace("'", "''");
1625                    values.push(format!("'{json}'"));
1626                }
1627                continue;
1628            } else if value.is_object() {
1629                if self.params.json[field].is_empty() {
1630                    values.push(format!("'{value}'"));
1631                } else {
1632                    let json = value.to_string();
1633                    let json = json.replace("'", "''");
1634                    values.push(format!("'{json}'"));
1635                }
1636                continue;
1637            } else if value.is_number() {
1638                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1639                if col_type == "boolean" {
1640                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1641                    values.push(format!("{bool_val}"));
1642                } else if col_type.contains("int") {
1643                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1644                } else {
1645                    values.push(format!("{value}"));
1646                }
1647                continue;
1648            } else if value.is_boolean() || value.is_null() {
1649                values.push(format!("{value}"));
1650                continue;
1651            } else {
1652                values.push(format!("'{value}'"));
1653                continue;
1654            }
1655        }
1656
1657        let conflict_cols: Vec<String> = conflict_fields
1658            .iter()
1659            .map(|f| format!("\"{}\"", f))
1660            .collect();
1661
1662        let update_set: Vec<String> = fields
1663            .iter()
1664            .filter(|f| {
1665                let name = f.trim_matches('"');
1666                !conflict_fields.contains(&name) && name != "id"
1667            })
1668            .map(|f| format!("{f}=EXCLUDED.{f}"))
1669            .collect();
1670
1671        let fields_str = fields.join(",");
1672        let values_str = values.join(",");
1673
1674        let sql = format!(
1675            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1676            self.params.table,
1677            fields_str,
1678            values_str,
1679            conflict_cols.join(","),
1680            update_set.join(",")
1681        );
1682        if self.params.sql {
1683            return JsonValue::from(sql.clone());
1684        }
1685        let (state, result) = self.execute(sql.as_str());
1686        match state {
1687            true => match self.params.autoinc {
1688                true => result.clone(),
1689                false => data["id"].clone(),
1690            },
1691            false => {
1692                let thread_id = format!("{:?}", thread::current().id());
1693                error!("upsert失败: {thread_id} {result:?} {sql}");
1694                JsonValue::from("")
1695            }
1696        }
1697    }
1698    fn update(&mut self, data: JsonValue) -> JsonValue {
1699        let fields_list = self.table_info(&self.params.table.clone());
1700        let mut values = vec![];
1701        for (field, value) in data.entries() {
1702            if value.is_string() {
1703                values.push(format!(
1704                    "\"{}\"='{}'",
1705                    field,
1706                    value.to_string().replace("'", "''")
1707                ));
1708            } else if value.is_number() {
1709                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1710                if col_type == "boolean" {
1711                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1712                    values.push(format!("\"{field}\"= {bool_val}"));
1713                } else if col_type.contains("int") {
1714                    values.push(format!(
1715                        "\"{}\"= {}",
1716                        field,
1717                        value.as_f64().unwrap_or(0.0) as i64
1718                    ));
1719                } else {
1720                    values.push(format!("\"{field}\"= {value}"));
1721                }
1722            } else if value.is_array() {
1723                if self.params.json[field].is_empty() {
1724                    let array = value
1725                        .members()
1726                        .map(|x| x.as_str().unwrap_or(""))
1727                        .collect::<Vec<&str>>()
1728                        .join(",");
1729                    values.push(format!("\"{field}\"='{array}'"));
1730                } else {
1731                    let json = value.to_string();
1732                    let json = json.replace("'", "''");
1733                    values.push(format!("\"{field}\"='{json}'"));
1734                }
1735                continue;
1736            } else if value.is_object() {
1737                if self.params.json[field].is_empty() {
1738                    values.push(format!("\"{field}\"='{value}'"));
1739                } else {
1740                    if value.is_empty() {
1741                        values.push(format!("\"{field}\"=''"));
1742                        continue;
1743                    }
1744                    let json = value.to_string();
1745                    let json = json.replace("'", "''");
1746                    values.push(format!("\"{field}\"='{json}'"));
1747                }
1748                continue;
1749            } else if value.is_boolean() {
1750                values.push(format!("\"{field}\"= {value}"));
1751            } else {
1752                values.push(format!("\"{field}\"=\"{value}\""));
1753            }
1754        }
1755
1756        for (field, value) in self.params.inc_dec.entries() {
1757            values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1758        }
1759        if !self.params.update_column.is_empty() {
1760            values.extend(self.params.update_column.clone());
1761        }
1762        let values = values.join(",");
1763
1764        let sql = format!(
1765            "UPDATE {} SET {} {};",
1766            self.params.table.clone(),
1767            values,
1768            self.params.where_sql()
1769        );
1770        if self.params.sql {
1771            return JsonValue::from(sql.clone());
1772        }
1773        let (state, data) = self.execute(sql.as_str());
1774        if state {
1775            data
1776        } else {
1777            let thread_id = format!("{:?}", thread::current().id());
1778            error!("update: {thread_id} {data:?} {sql}");
1779            0.into()
1780        }
1781    }
1782    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1783        let fields_list = self.table_info(&self.params.table.clone());
1784        let mut values = vec![];
1785
1786        let mut ids = vec![];
1787        for (field, _) in data[0].entries() {
1788            if field == "id" {
1789                continue;
1790            }
1791            let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1792            let mut fields = vec![];
1793            for row in data.members() {
1794                let value = row[field].clone();
1795                let id = row["id"].clone();
1796                ids.push(id.clone());
1797                if value.is_string() {
1798                    fields.push(format!(
1799                        "WHEN '{}' THEN '{}'",
1800                        id,
1801                        value.to_string().replace("'", "''")
1802                    ));
1803                } else if value.is_array() || value.is_object() {
1804                    if self.params.json[field].is_empty() {
1805                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1806                    } else {
1807                        let json = value.to_string();
1808                        let json = json.replace("'", "''");
1809                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1810                    }
1811                    continue;
1812                } else if value.is_number() {
1813                    if col_type == "boolean" {
1814                        let bool_val = value.as_i64().unwrap_or(0) != 0;
1815                        fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1816                    } else {
1817                        fields.push(format!("WHEN '{id}' THEN {value}"));
1818                    }
1819                } else if value.is_boolean() || value.is_null() {
1820                    fields.push(format!("WHEN '{id}' THEN {value}"));
1821                } else {
1822                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1823                }
1824            }
1825            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1826        }
1827        self.where_and("id", "in", ids.into());
1828        for (field, value) in self.params.inc_dec.entries() {
1829            values.push(format!("{} = {}", field, value.to_string().clone()));
1830        }
1831
1832        let values = values.join(",");
1833        let sql = format!(
1834            "UPDATE {} SET {} {} {};",
1835            self.params.table.clone(),
1836            values,
1837            self.params.where_sql(),
1838            self.params.page_limit_sql()
1839        );
1840        if self.params.sql {
1841            return JsonValue::from(sql.clone());
1842        }
1843        let (state, data) = self.execute(sql.as_str());
1844        if state {
1845            data
1846        } else {
1847            error!("update_all: {data:?}");
1848            JsonValue::from(0)
1849        }
1850    }
1851
1852    fn delete(&mut self) -> JsonValue {
1853        let sql = format!(
1854            "delete FROM {} {} {};",
1855            self.params.table.clone(),
1856            self.params.where_sql(),
1857            self.params.page_limit_sql()
1858        );
1859        if self.params.sql {
1860            return JsonValue::from(sql.clone());
1861        }
1862        let (state, data) = self.execute(sql.as_str());
1863        match state {
1864            true => data,
1865            false => {
1866                error!("delete 失败>>> {data:?}");
1867                JsonValue::from(0)
1868            }
1869        }
1870    }
1871
1872    fn transaction(&mut self) -> bool {
1873        let thread_id = format!("{:?}", thread::current().id());
1874        let key = format!("{}{}", self.default, thread_id);
1875
1876        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1877            let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1878            PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1879            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1880            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1881            return true;
1882        }
1883
1884        // 获取事务连接,失败时重试2次(短退避)
1885        let mut conn = None;
1886        for attempt in 0..3u8 {
1887            match self.client.get_connect_for_transaction() {
1888                Ok(mut c) => {
1889                    if c.is_valid() {
1890                        conn = Some(c);
1891                        break;
1892                    }
1893                    warn!("事务连接无效(第{}次)", attempt + 1);
1894                    self.client.release_transaction_conn();
1895                }
1896                Err(e) => {
1897                    warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1898                }
1899            }
1900            if attempt < 2 {
1901                thread::sleep(std::time::Duration::from_millis(200));
1902            }
1903        }
1904        let mut conn = match conn {
1905            Some(c) => c,
1906            None => {
1907                error!("获取事务连接重试耗尽");
1908                return false;
1909            }
1910        };
1911
1912        if let Err(e) = conn.execute("START TRANSACTION") {
1913            error!("启动事务失败: {e}");
1914            self.client.release_transaction_conn();
1915            return false;
1916        }
1917
1918
1919        PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1920        true
1921    }
1922    fn commit(&mut self) -> bool {
1923        let thread_id = format!("{:?}", thread::current().id());
1924        let key = format!("{}{}", self.default, thread_id);
1925
1926        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1927            error!("commit: 没有活跃的事务");
1928            return false;
1929        }
1930
1931        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1932        if depth > 1 {
1933            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1934            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1935            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1936            return true;
1937        }
1938
1939        let commit_result =
1940            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1941
1942        let success = match commit_result {
1943            Some(Ok(_)) => true,
1944            Some(Err(e)) => {
1945                error!("提交事务失败: {e}");
1946                false
1947            }
1948            None => {
1949                error!("提交事务失败: 未找到连接");
1950                false
1951            }
1952        };
1953
1954        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1955        self.client.release_transaction_conn();
1956        success
1957    }
1958
1959    fn rollback(&mut self) -> bool {
1960        let thread_id = format!("{:?}", thread::current().id());
1961        let key = format!("{}{}", self.default, thread_id);
1962
1963        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1964            error!("rollback: 没有活跃的事务");
1965            return false;
1966        }
1967
1968        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1969        if depth > 1 {
1970            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1971            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1972            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1973            return true;
1974        }
1975
1976        let rollback_result =
1977            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1978
1979        let success = match rollback_result {
1980            Some(Ok(_)) => true,
1981            Some(Err(e)) => {
1982                error!("回滚失败: {e}");
1983                false
1984            }
1985            None => {
1986                error!("回滚失败: 未找到连接");
1987                false
1988            }
1989        };
1990
1991        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1992        self.client.release_transaction_conn();
1993        success
1994    }
1995
1996    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1997        let (state, data) = self.query(sql);
1998        match state {
1999            true => Ok(data),
2000            false => Err(data.to_string()),
2001        }
2002    }
2003
2004    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2005        let (state, data) = self.execute(sql);
2006        match state {
2007            true => Ok(data),
2008            false => Err(data.to_string()),
2009        }
2010    }
2011
2012    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2013        self.params.inc_dec[field] = format!("{field} + {num}").into();
2014        self
2015    }
2016
2017    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2018        self.params.inc_dec[field] = format!("{field} - {num}").into();
2019        self
2020    }
2021    fn buildsql(&mut self) -> String {
2022        self.fetch_sql();
2023        let sql = self.select().to_string();
2024        format!("( {} ) {}", sql, self.params.table)
2025    }
2026
2027    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2028        for field in fields {
2029            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2030        }
2031        self
2032    }
2033
2034    fn join(
2035        &mut self,
2036        main_table: &str,
2037        main_fields: &str,
2038        right_table: &str,
2039        right_fields: &str,
2040    ) -> &mut Self {
2041        let main_table = if main_table.is_empty() {
2042            self.params.table.clone()
2043        } else {
2044            main_table.to_string()
2045        };
2046        self.params.join_table = right_table.to_string();
2047        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2048        self
2049    }
2050
2051    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2052        let main_fields = if main_fields.is_empty() {
2053            "id"
2054        } else {
2055            main_fields
2056        };
2057        let second_fields = if second_fields.is_empty() {
2058            self.params.table.clone()
2059        } else {
2060            second_fields.to_string().clone()
2061        };
2062        let sec_table_name = format!("{}{}", table, "_2");
2063        let second_table = format!("{} {}", table, sec_table_name.clone());
2064        self.params.join_table = sec_table_name.clone();
2065        self.params.join.push(format!(
2066            " INNER JOIN {} ON {}.{} = {}.{}",
2067            second_table, self.params.table, main_fields, sec_table_name, second_fields
2068        ));
2069        self
2070    }
2071
2072    fn join_right(
2073        &mut self,
2074        main_table: &str,
2075        main_fields: &str,
2076        right_table: &str,
2077        right_fields: &str,
2078    ) -> &mut Self {
2079        let main_table = if main_table.is_empty() {
2080            self.params.table.clone()
2081        } else {
2082            main_table.to_string()
2083        };
2084        self.params.join_table = right_table.to_string();
2085        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2086        self
2087    }
2088
2089    fn join_full(
2090        &mut self,
2091        main_table: &str,
2092        main_fields: &str,
2093        right_table: &str,
2094        right_fields: &str,
2095    ) -> &mut Self {
2096        let main_table = if main_table.is_empty() {
2097            self.params.table.clone()
2098        } else {
2099            main_table.to_string()
2100        };
2101        self.params.join_table = right_table.to_string();
2102        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2103        self
2104    }
2105
2106    fn union(&mut self, sub_sql: &str) -> &mut Self {
2107        self.params.unions.push(format!("UNION {sub_sql}"));
2108        self
2109    }
2110
2111    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2112        self.params.unions.push(format!("UNION ALL {sub_sql}"));
2113        self
2114    }
2115
2116    fn lock_for_update(&mut self) -> &mut Self {
2117        self.params.lock_mode = "FOR UPDATE".to_string();
2118        self
2119    }
2120
2121    fn lock_for_share(&mut self) -> &mut Self {
2122        self.params.lock_mode = "FOR SHARE".to_string();
2123        self
2124    }
2125}