Skip to main content

br_db/types/
pgsql.rs

1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use br_pgsql::PgsqlError;
7use chrono::Local;
8use json::{array, object, JsonValue};
9use log::{error, info, warn};
10use std::thread;
11#[derive(Clone)]
12pub struct Pgsql {
13    /// 当前连接配置
14    pub connection: Connection,
15    /// 当前选中配置
16    pub default: String,
17    pub params: Params,
18    pub client: Pools,
19}
20
21impl Pgsql {
22    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
23        let port = connection
24            .hostport
25            .parse::<i32>()
26            .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
27
28        let cp_connection = connection.clone();
29        let config = object! {
30            debug: cp_connection.debug,
31            username: cp_connection.username,
32            userpass: cp_connection.userpass,
33            database: cp_connection.database,
34            hostname: cp_connection.hostname,
35            hostport: port,
36            charset: cp_connection.charset.str(),
37        };
38        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
39
40        let pools = pgsql.pools()?;
41        Ok(Self {
42            connection,
43            default: default.clone(),
44            params: Params::default("pgsql"),
45            client: pools,
46        })
47    }
48
49    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
50        let thread_id = format!("{:?}", thread::current().id());
51        let key = format!("{}{}", self.default, thread_id);
52
53        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
54            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
55
56            match result {
57                Some(Ok(e)) => {
58                    if self.connection.debug {
59                        info!("查询成功: {} {}", thread_id.clone(), sql);
60                    }
61                    (true, e.rows)
62                }
63                Some(Err(e)) => {
64                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
65                    (false, JsonValue::from(e.to_string()))
66                }
67                None => {
68                    error!("事务查询失败: 未找到事务连接 {thread_id}");
69                    (false, JsonValue::from("未找到事务连接"))
70                }
71            }
72        } else {
73            let mut guard = match self.client.get_guard() {
74                Ok(g) => g,
75                Err(e) => {
76                    // 连接池层已内部重试,此处快速失败(与 MySQL try_get_conn 行为一致)
77                    error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
78                    return (false, JsonValue::from(e.to_string()));
79                }
80            };
81            match guard.conn().query(sql) {
82                Ok(e) => {
83                    if self.connection.debug {
84                        info!("查询成功: {} {}", thread_id.clone(), sql);
85                    }
86                    (true, e.rows)
87                }
88                Err(ref e) if Self::is_retriable_error(e) => {
89                    // 查询执行时连接断开,丢弃坏连接后重试一次
90
91                    guard.discard();
92                    // failover 场景:池里其他连接可能也已死亡,清空空闲连接强制走 Create 路径
93                    self.client.flush_idle();
94                    warn!("非事务查询连接断开(重试一次): {thread_id} {e}");
95                    thread::sleep(std::time::Duration::from_millis(200));
96                    let mut guard2 = match self.client.get_guard() {
97                        Ok(g) => g,
98                        Err(e) => {
99                            error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
100                            return (false, JsonValue::from(e.to_string()));
101                        }
102                    };
103                    match guard2.conn().query(sql) {
104                        Ok(e) => {
105                            if self.connection.debug {
106                                info!("查询成功(重试): {} {}", thread_id.clone(), sql);
107                            }
108                            (true, e.rows)
109                        }
110                        Err(e) => {
111                            error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
112                            (false, JsonValue::from(e.to_string()))
113                        }
114                    }
115                }
116                Err(e) => {
117                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
118                    (false, JsonValue::from(e.to_string()))
119                }
120            }
121        }
122    }
123    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
124        let thread_id = format!("{:?}", thread::current().id());
125        let key = format!("{}{}", self.default, thread_id);
126
127        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
128            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
129
130            match result {
131                Some(Ok(e)) => {
132                    if self.connection.debug {
133                        info!("提交成功: {} {}", thread_id.clone(), sql);
134                    }
135                    if sql.contains("INSERT") {
136                        (true, e.rows)
137                    } else {
138                        (true, e.affect_count.into())
139                    }
140                }
141                Some(Err(e)) => {
142                    error!("事务提交失败: {thread_id} {e}");
143                    (false, JsonValue::from(e.to_string()))
144                }
145                None => {
146                    error!("事务执行失败: 未找到事务连接 {thread_id}");
147                    (false, JsonValue::from("未找到事务连接"))
148                }
149            }
150        } else {
151            // 连接池层已内部重试,此处快速失败(与 MySQL try_get_conn 行为一致)
152            let mut guard = match self.client.get_guard() {
153                Ok(g) => g,
154                Err(e) => {
155                    error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
156                    return (false, JsonValue::from(e.to_string()));
157                }
158            };
159            match guard.conn().execute(sql) {
160                Ok(e) => {
161                    if self.connection.debug {
162                        info!("提交成功: {} {}", thread_id.clone(), sql);
163                    }
164                    if sql.contains("INSERT") {
165                        (true, e.rows)
166                    } else {
167                        (true, e.affect_count.into())
168                    }
169                }
170                Err(ref e) if Self::is_retriable_error(e) => {
171                    // 执行时连接断开,丢弃坏连接后重试一次
172
173                    guard.discard();
174                    // failover 场景:池里其他连接可能也已死亡,清空空闲连接强制走 Create 路径
175                    self.client.flush_idle();
176                    warn!("非事务执行连接断开(重试一次): {thread_id} {e}");
177                    thread::sleep(std::time::Duration::from_millis(200));
178                    let mut guard2 = match self.client.get_guard() {
179                        Ok(g) => g,
180                        Err(e) => {
181                            error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
182                            return (false, JsonValue::from(e.to_string()));
183                        }
184                    };
185                    match guard2.conn().execute(sql) {
186                        Ok(e) => {
187                            if self.connection.debug {
188                                info!("提交成功(重试): {} {}", thread_id.clone(), sql);
189                            }
190                            if sql.contains("INSERT") {
191                                (true, e.rows)
192                            } else {
193                                (true, e.affect_count.into())
194                            }
195                        }
196                        Err(e) => {
197                            error!("非事务执行重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
198                            (false, JsonValue::from(e.to_string()))
199                        }
200                    }
201                }
202                Err(e) => {
203                    error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
204                    (false, JsonValue::from(e.to_string()))
205                }
206            }
207        }
208    }
209
210    /// 判断是否为可重试的连接类错误(连接断开、IO错误、超时)
211    fn is_retriable_error(e: &PgsqlError) -> bool {
212        matches!(
213            e,
214            PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
215        )
216    }
217}
218
219impl DbMode for Pgsql {
220    fn database_tables(&mut self) -> JsonValue {
221        let sql = "SHOW TABLES".to_string();
222        match self.sql(sql.as_str()) {
223            Ok(e) => {
224                let mut list = vec![];
225                for item in e.members() {
226                    for (_, value) in item.entries() {
227                        list.push(value.clone());
228                    }
229                }
230                list.into()
231            }
232            Err(_) => {
233                array![]
234            }
235        }
236    }
237
238    fn database_create(&mut self, name: &str) -> bool {
239        let sql = format!("CREATE DATABASE {name}");
240
241        let (state, data) = self.execute(sql.as_str());
242        match state {
243            true => data.as_bool().unwrap_or(true),
244            false => {
245                error!("创建数据库失败: {data:?}");
246                false
247            }
248        }
249    }
250
251    fn truncate(&mut self, table: &str) -> bool {
252        let sql = format!("TRUNCATE TABLE {table}");
253        let (state, _) = self.execute(sql.as_str());
254        state
255    }
256}
257
258impl Mode for Pgsql {
259    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
260        let mut sql = String::new();
261        let mut comments = vec![];
262
263        if !options.table_unique.is_empty() {
264            let full_name = format!(
265                "{}_unique_{}",
266                options.table_name,
267                options.table_unique.join("_")
268            );
269            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
270            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
271            let unique = format!(
272                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
273                name,
274                options.table_name,
275                options.table_unique.join(",")
276            );
277            comments.push(unique);
278        }
279
280        for row in options.table_index.iter() {
281            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
282            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
283            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
284            let index = format!(
285                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
286                name,
287                options.table_name,
288                row.join(",")
289            );
290            comments.push(index);
291        }
292
293        for (name, field) in options.table_fields.entries_mut() {
294            field["table_name"] = options.table_name.clone().into();
295            let row = br_fields::field("pgsql", name, field.clone());
296            let (col_sql, meta) = if let Some(idx) = row.find("--") {
297                (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
298            } else {
299                (row.trim(), None)
300            };
301            if let Some(meta) = meta {
302                comments.push(format!(
303                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
304                    options.table_name, name, meta
305                ));
306            }
307            sql = format!("{} {},\r\n", sql, col_sql);
308        }
309
310        let primary_key = format!(
311            "CONSTRAINT {}_{} PRIMARY KEY ({})",
312            options.table_name, options.table_key, options.table_key
313        );
314        let sql = format!(
315            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
316            options.table_name,
317            sql.trim_end_matches(",\r\n"),
318            primary_key
319        );
320        comments.insert(0, sql);
321
322        for (_name, field) in options.table_fields.entries() {
323            let _ = field["mode"].as_str();
324        }
325
326        if self.params.sql {
327            let info = comments.join("\r\n");
328            return JsonValue::from(info);
329        }
330        for comment in comments {
331            let (state, _) = self.execute(comment.as_str());
332            match state {
333                true => {}
334                false => {
335                    return JsonValue::from(state);
336                }
337            }
338        }
339        JsonValue::from(true)
340    }
341
342    fn table_update(&mut self, options: TableOptions) -> JsonValue {
343        let fields_list = self.table_info(&options.table_name);
344        let mut put = vec![];
345        let mut add = vec![];
346        let mut del = vec![];
347        let mut comments = vec![];
348
349        for (key, _) in fields_list.entries() {
350            if options.table_fields[key].is_empty() {
351                del.push(key);
352            }
353        }
354        for (name, field) in options.table_fields.entries() {
355            if !fields_list[name].is_empty() {
356                let old_info = &fields_list[name];
357                let new_field_sql = br_fields::field("pgsql", name, field.clone());
358
359                let old_comment = old_info["comment"].as_str().unwrap_or("");
360                let old_type = old_info["type"].as_str().unwrap_or("");
361
362                let new_comment = if let Some(idx) = new_field_sql.find("--") {
363                    new_field_sql[idx + 2..].trim()
364                } else {
365                    ""
366                };
367
368                let comment_matches =
369                    if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
370                        let old_parts: Vec<&str> = old_comment.split('|').collect();
371                        let new_parts: Vec<&str> = new_comment.split('|').collect();
372                        if old_parts.len() >= 4 && new_parts.len() >= 4 {
373                            old_parts[..4] == new_parts[..4]
374                        } else {
375                            old_comment == new_comment
376                        }
377                    } else if !old_comment.is_empty() && !new_comment.is_empty() {
378                        let old_parts: Vec<&str> = old_comment.split('|').collect();
379                        let new_parts: Vec<&str> = new_comment.split('|').collect();
380                        if old_parts.len() >= 2
381                            && new_parts.len() >= 2
382                            && old_parts.len() == new_parts.len()
383                        {
384                            let old_filtered: Vec<&str> = old_parts
385                                .iter()
386                                .filter(|v| **v != "true" && **v != "false")
387                                .copied()
388                                .collect();
389                            let new_filtered: Vec<&str> = new_parts
390                                .iter()
391                                .filter(|v| **v != "true" && **v != "false")
392                                .copied()
393                                .collect();
394                            old_filtered == new_filtered
395                        } else {
396                            old_comment == new_comment
397                        }
398                    } else {
399                        old_comment == new_comment
400                    };
401
402                let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
403                let new_type = if sql_parts.len() > 1 {
404                    sql_parts[1].to_lowercase()
405                } else {
406                    String::new()
407                };
408
409                let type_matches = match old_type {
410                    "integer" => {
411                        new_type.contains("int")
412                            && !new_type.contains("bigint")
413                            && !new_type.contains("smallint")
414                    }
415                    "bigint" => new_type.contains("bigint"),
416                    "smallint" => new_type.contains("smallint"),
417                    "boolean" => new_type.contains("boolean"),
418                    "text" => new_type.contains("text"),
419                    "character varying" => {
420                        if !new_type.contains("varchar") {
421                            false
422                        } else {
423                            let old_len = old_info["max_length"].as_i64().unwrap_or(0);
424                            let new_len = new_type
425                                .trim_start_matches("varchar(")
426                                .trim_end_matches(')')
427                                .parse::<i64>()
428                                .unwrap_or(0);
429                            let matched = old_len == new_len || new_len == 0;
430                            if !matched {
431                                log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
432                            }
433                            old_len == new_len || new_len == 0
434                        }
435                    }
436                    "character" => new_type.contains("char") && !new_type.contains("varchar"),
437                    "numeric" => {
438                        if !(new_type.contains("numeric") || new_type.contains("decimal")) {
439                            false
440                        } else {
441                            let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
442                            let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
443                            let inner = new_type
444                                .replace("numeric(", "")
445                                .replace("decimal(", "")
446                                .replace(')', "");
447                            let parts: Vec<&str> = inner.split(',').collect();
448                            let new_prec = parts.first().and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
449                            let new_scale = parts.get(1).and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
450                            old_prec == new_prec && old_scale == new_scale
451                        }
452                    }
453                    "double precision" => {
454                        new_type.contains("double") || new_type.contains("float8")
455                    }
456                    "real" => new_type.contains("real") || new_type.contains("float4"),
457                    "timestamp without time zone" | "timestamp with time zone" => {
458                        new_type.contains("timestamp")
459                    }
460                    "date" => new_type.contains("date") && !new_type.contains("timestamp"),
461                    "time without time zone" | "time with time zone" => {
462                        new_type.contains("time") && !new_type.contains("timestamp")
463                    }
464                    "json" | "jsonb" => new_type.contains("json"),
465                    "uuid" => new_type.contains("uuid"),
466                    "bytea" => new_type.contains("bytea"),
467                    _ => old_type == new_type,
468                };
469
470                if type_matches && comment_matches {
471                    continue;
472                }
473
474                log::debug!(
475                    "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
476                    options.table_name,
477                    name,
478                    type_matches,
479                    old_type,
480                    new_type,
481                    comment_matches
482                );
483                put.push(name);
484            } else {
485                add.push(name);
486            }
487        }
488
489        for name in add.iter() {
490            let name = name.to_string();
491            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
492            let rows = row.split("--").collect::<Vec<&str>>();
493            comments.push(format!(
494                r#"ALTER TABLE "{}" add {};"#,
495                options.table_name,
496                rows[0].trim()
497            ));
498            if rows.len() > 1 {
499                comments.push(format!(
500                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
501                    options.table_name,
502                    name,
503                    rows[1].trim()
504                ));
505            }
506        }
507        for name in del.iter() {
508            comments.push(format!(
509                "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
510                options.table_name, name
511            ));
512        }
513        for name in put.iter() {
514            let name = name.to_string();
515            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
516            let rows = row.split("--").collect::<Vec<&str>>();
517
518            let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
519
520            if sql[1].contains("BOOLEAN") {
521                let text = format!(
522                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
523                    options.table_name, name
524                );
525                comments.push(text.clone());
526                let text = format!(
527                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
528                    options.table_name, name, sql[1]
529                );
530                comments.push(text.clone());
531            } else {
532                let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
533                let new_type_lower = sql[1].to_lowercase();
534                let is_date_to_numeric = (old_col_type == "date"
535                    || old_col_type.contains("timestamp"))
536                    && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
537                if is_date_to_numeric {
538                    comments.push(format!(
539                        "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
540                        options.table_name, name
541                    ));
542                    comments.push(format!(
543                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
544                        options.table_name, name, sql[1], name, name, name
545                    ));
546                } else {
547                    let text = format!(
548                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
549                        options.table_name, name, sql[1]
550                    );
551                    comments.push(text.clone());
552                }
553            };
554
555            if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
556                let default_value = rows[0][default_pos + 9..].trim();
557                if !default_value.is_empty() {
558                    comments.push(format!(
559                        "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
560                        options.table_name, name, default_value
561                    ));
562                }
563            }
564            // PostgreSQL 不使用 NOT NULL 约束,依赖默认值
565            // 如果现有字段有 NOT NULL 约束,移除它
566            let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
567                .as_str()
568                .unwrap_or("YES");
569            let old_is_required = old_is_nullable == "NO";
570
571            // 跳过主键字段,主键隐含 NOT NULL 约束
572            if old_is_required && name != options.table_key {
573                comments.push(format!(
574                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
575                    options.table_name, name
576                ));
577            }
578
579            if rows.len() > 1 {
580                comments.push(format!(
581                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
582                    options.table_name,
583                    name,
584                    rows[1].trim()
585                ));
586            }
587        }
588
589        let mut unique_new = vec![];
590        let mut index_new = vec![];
591        let mut primary_key = vec![];
592        let (_, index_list) = self.query(
593            format!(
594                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
595                options.table_name
596            )
597            .as_str(),
598        );
599        for item in index_list.members() {
600            let key_name = item["indexname"].as_str().unwrap_or("");
601            let indexdef = item["indexdef"].to_string();
602
603            if indexdef.contains(
604                format!(
605                    "CREATE UNIQUE INDEX {}_{} ON",
606                    options.table_name, options.table_key
607                )
608                .as_str(),
609            ) {
610                primary_key.push(key_name.to_string());
611                continue;
612            }
613            if indexdef.contains("CREATE UNIQUE INDEX") {
614                unique_new.push(key_name.to_string());
615                continue;
616            }
617            if indexdef.contains("CREATE INDEX") {
618                index_new.push(key_name.to_string());
619                continue;
620            }
621        }
622
623        if !options.table_unique.is_empty() {
624            let full_name = format!(
625                "{}_unique_{}",
626                options.table_name,
627                options.table_unique.join("_")
628            );
629            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
630            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
631            let unique = format!(
632                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
633                name,
634                options.table_name,
635                options.table_unique.join(",")
636            );
637            if !unique_new.contains(&name) {
638                comments.push(unique);
639            }
640            unique_new.retain(|x| *x != name);
641        }
642
643        for row in options.table_index.iter() {
644            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
645            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
646            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
647            let index = format!(
648                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
649                name,
650                options.table_name,
651                row.join(",")
652            );
653            if !index_new.contains(&name) {
654                comments.push(index);
655            }
656            index_new.retain(|x| *x != name);
657        }
658
659        for item in unique_new {
660            if item.ends_with("_pkey") {
661                continue;
662            }
663            if item.starts_with("unique_") {
664                comments.push(format!(
665                    "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
666                    options.table_name,
667                    item.clone()
668                ));
669            } else {
670                comments.push(format!("DROP INDEX {};\r\n", item.clone()));
671            }
672        }
673        for item in index_new {
674            if item.ends_with("_pkey") {
675                continue;
676            }
677            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
678        }
679
680        if self.params.sql {
681            return JsonValue::from(comments.join(""));
682        }
683
684        if comments.is_empty() {
685            return JsonValue::from(-1);
686        }
687
688        for item in comments.iter() {
689            let (state, res) = self.execute(item.as_str());
690            match state {
691                true => {}
692                false => {
693                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
694                    return JsonValue::from(0);
695                }
696            }
697        }
698        JsonValue::from(1)
699    }
700
701    fn table_info(&mut self, table: &str) -> JsonValue {
702        let sql = format!(
703            "SELECT  COL.COLUMN_NAME,
704    COL.DATA_TYPE,
705    COL.IS_NULLABLE,
706    COL.CHARACTER_MAXIMUM_LENGTH,
707    COL.NUMERIC_PRECISION,
708    COL.NUMERIC_SCALE,
709    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
710    LEFT JOIN
711    pg_catalog.pg_description DESCRIPTION
712    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
713    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
714        let (state, data) = self.query(sql.as_str());
715        let mut list = object! {};
716        if state {
717            for item in data.members() {
718                let mut row = object! {};
719                row["field"] = item["column_name"].clone();
720                row["comment"] = item["comment"].clone();
721                row["type"] = item["data_type"].clone();
722                row["is_nullable"] = item["is_nullable"].clone();
723                row["max_length"] = item["character_maximum_length"].clone();
724                row["numeric_precision"] = item["numeric_precision"].clone();
725                row["numeric_scale"] = item["numeric_scale"].clone();
726                if let Some(field_name) = row["field"].as_str() {
727                    list[field_name] = row.clone();
728                }
729            }
730            list
731        } else {
732            list
733        }
734    }
735
736    fn table_is_exist(&mut self, name: &str) -> bool {
737        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
738        let (state, data) = self.query(sql.as_str());
739        match state {
740            true => {
741                for item in data.members() {
742                    if item.has_key("exists") {
743                        return item["exists"].as_bool().unwrap_or(false);
744                    }
745                }
746                false
747            }
748            false => false,
749        }
750    }
751
752    fn table(&mut self, name: &str) -> &mut Pgsql {
753        self.params = Params::default(self.connection.mode.str().as_str());
754        let table_name = format!("{}{}", self.connection.prefix, name);
755        if !super::sql_safety::validate_table_name(&table_name) {
756            error!("Invalid table name: {}", name);
757        }
758        self.params.table = table_name.clone();
759        self.params.join_table = table_name;
760        self
761    }
762
763    fn change_table(&mut self, name: &str) -> &mut Self {
764        self.params.join_table = name.to_string();
765        self
766    }
767
768    fn autoinc(&mut self) -> &mut Self {
769        self.params.autoinc = true;
770        self
771    }
772
773    fn timestamps(&mut self) -> &mut Self {
774        self.params.timestamps = true;
775        self
776    }
777
778    fn fetch_sql(&mut self) -> &mut Self {
779        self.params.sql = true;
780        self
781    }
782
783    fn order(&mut self, field: &str, by: bool) -> &mut Self {
784        self.params.order[field] = {
785            if by {
786                "DESC"
787            } else {
788                "ASC"
789            }
790        }
791        .into();
792        self
793    }
794
795    fn group(&mut self, field: &str) -> &mut Self {
796        let fields: Vec<&str> = field.split(",").collect();
797        for field in fields.iter() {
798            let field = field.to_string();
799            self.params.group[field.as_str()] = field.clone().into();
800            self.params.fields[field.as_str()] = field.clone().into();
801        }
802        self
803    }
804
805    fn distinct(&mut self) -> &mut Self {
806        self.params.distinct = true;
807        self
808    }
809
810    fn json(&mut self, field: &str) -> &mut Self {
811        let list: Vec<&str> = field.split(",").collect();
812        for item in list.iter() {
813            self.params.json[item.to_string().as_str()] = item.to_string().into();
814        }
815        self
816    }
817
818    fn location(&mut self, field: &str) -> &mut Self {
819        let list: Vec<&str> = field.split(",").collect();
820        for item in list.iter() {
821            self.params.location[item.to_string().as_str()] = item.to_string().into();
822        }
823        self
824    }
825
826    fn field(&mut self, field: &str) -> &mut Self {
827        let list: Vec<&str> = field.split(",").collect();
828        let join_table = if self.params.join_table.is_empty() {
829            self.params.table.clone()
830        } else {
831            self.params.join_table.clone()
832        };
833        for item in list.iter() {
834            let lower = item.to_lowercase();
835            let is_expr = lower.contains("count(")
836                || lower.contains("sum(")
837                || lower.contains("avg(")
838                || lower.contains("max(")
839                || lower.contains("min(")
840                || lower.contains("case ");
841            if is_expr {
842                self.params.fields[item.to_string().as_str()] = (*item).into();
843            } else if item.contains(" as ") {
844                let text = item.split(" as ").collect::<Vec<&str>>();
845                self.params.fields[item.to_string().as_str()] =
846                    format!("{}.{} as {}", join_table, text[0], text[1]).into();
847            } else {
848                self.params.fields[item.to_string().as_str()] =
849                    format!("{join_table}.{item}").into();
850            }
851        }
852        self
853    }
854
855    fn field_raw(&mut self, expr: &str) -> &mut Self {
856        self.params.fields[expr] = expr.into();
857        self
858    }
859
860    fn hidden(&mut self, name: &str) -> &mut Self {
861        let hidden: Vec<&str> = name.split(",").collect();
862
863        let fields_list = self.table_info(self.params.clone().table.as_str());
864        let mut data = array![];
865        for item in fields_list.members() {
866            let _ = data.push(object! {
867                "name":item["field"].as_str().unwrap_or("")
868            });
869        }
870
871        for item in data.members() {
872            let name = item["name"].as_str().unwrap_or("");
873            if !hidden.contains(&name) {
874                self.params.fields[name] = name.into();
875            }
876        }
877        self
878    }
879
880    fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
881        for f in field.split('|') {
882            if !super::sql_safety::validate_field_name(f) {
883                error!("Invalid field name: {}", f);
884            }
885        }
886        if !super::sql_safety::validate_compare_orator(compare) {
887            error!("Invalid compare operator: {}", compare);
888        }
889        let join_table = if self.params.join_table.is_empty() {
890            self.params.table.clone()
891        } else {
892            self.params.join_table.clone()
893        };
894        if value.is_boolean() {
895            let bool_val = value.as_bool().unwrap_or(false);
896            self.params
897                .where_and
898                .push(format!("{join_table}.{field} {compare} {bool_val}"));
899            return self;
900        }
901        match compare {
902            "between" => {
903                self.params.where_and.push(format!(
904                    "{}.{} between '{}' AND '{}'",
905                    join_table, field, value[0], value[1]
906                ));
907            }
908            "set" => {
909                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
910                let mut wheredata = vec![];
911                for item in list.iter() {
912                    wheredata.push(format!(
913                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
914                    ));
915                }
916                self.params
917                    .where_and
918                    .push(format!("({})", wheredata.join(" or ")));
919            }
920            "notin" => {
921                let mut text = String::new();
922                for item in value.members() {
923                    text = format!("{text},'{item}'");
924                }
925                text = text.trim_start_matches(",").into();
926                self.params
927                    .where_and
928                    .push(format!("{join_table}.{field} not in ({text})"));
929            }
930            "is" => {
931                self.params
932                    .where_and
933                    .push(format!("{join_table}.{field} is {value}"));
934            }
935            "isnot" => {
936                self.params
937                    .where_and
938                    .push(format!("{join_table}.{field} is not {value}"));
939            }
940            "notlike" => {
941                self.params
942                    .where_and
943                    .push(format!("{join_table}.{field} not like '{value}'"));
944            }
945            "in" => {
946                if value.is_array() && value.is_empty() {
947                    self.params.where_and.push("1=0".to_string());
948                    return self;
949                }
950                let mut text = String::new();
951                if value.is_array() {
952                    for item in value.members() {
953                        text = format!("{text},'{item}'");
954                    }
955                } else if value.is_null() {
956                    text = format!("{text},null");
957                } else {
958                    let value = value.as_str().unwrap_or("");
959
960                    let value: Vec<&str> = value.split(",").collect();
961                    for item in value.iter() {
962                        text = format!("{text},'{item}'");
963                    }
964                }
965                text = text.trim_start_matches(",").into();
966
967                self.params
968                    .where_and
969                    .push(format!("{join_table}.{field} {compare} ({text})"));
970            }
971            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
972            // 用法:.where_and("tags", "json_contains", "紧急".into())
973            //       .where_and("tags", "json_contains", json::array!["紧急", "重要"])
974            "json_contains" => {
975                if value.is_array() {
976                    if value.is_empty() {
977                        self.params.where_and.push("1=0".to_string());
978                    } else {
979                        let mut parts = vec![];
980                        for item in value.members() {
981                            let escaped = super::sql_safety::escape_string(&item.to_string());
982                            parts.push(format!(
983                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
984                                escaped
985                            ));
986                        }
987                        self.params
988                            .where_and
989                            .push(format!("({})", parts.join(" OR ")));
990                    }
991                } else {
992                    let escaped = super::sql_safety::escape_string(&value.to_string());
993                    self.params.where_and.push(format!(
994                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
995                        escaped
996                    ));
997                }
998            }
999            _ => {
1000                self.params
1001                    .where_and
1002                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1003            }
1004        }
1005        self
1006    }
1007
1008    fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1009        for f in field.split('|') {
1010            if !super::sql_safety::validate_field_name(f) {
1011                error!("Invalid field name: {}", f);
1012            }
1013        }
1014        if !super::sql_safety::validate_compare_orator(compare) {
1015            error!("Invalid compare operator: {}", compare);
1016        }
1017        let join_table = if self.params.join_table.is_empty() {
1018            self.params.table.clone()
1019        } else {
1020            self.params.join_table.clone()
1021        };
1022
1023        if value.is_boolean() {
1024            let bool_val = value.as_bool().unwrap_or(false);
1025            self.params
1026                .where_or
1027                .push(format!("{join_table}.{field} {compare} {bool_val}"));
1028            return self;
1029        }
1030
1031        match compare {
1032            "between" => {
1033                self.params.where_or.push(format!(
1034                    "{}.{} between '{}' AND '{}'",
1035                    join_table, field, value[0], value[1]
1036                ));
1037            }
1038            "set" => {
1039                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1040                let mut wheredata = vec![];
1041                for item in list.iter() {
1042                    wheredata.push(format!(
1043                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1044                    ));
1045                }
1046                self.params
1047                    .where_or
1048                    .push(format!("({})", wheredata.join(" or ")));
1049            }
1050            "notin" => {
1051                let mut text = String::new();
1052                for item in value.members() {
1053                    text = format!("{text},'{item}'");
1054                }
1055                text = text.trim_start_matches(",").into();
1056                self.params
1057                    .where_or
1058                    .push(format!("{join_table}.{field} not in ({text})"));
1059            }
1060            "is" => {
1061                self.params
1062                    .where_or
1063                    .push(format!("{join_table}.{field} is {value}"));
1064            }
1065            "isnot" => {
1066                self.params
1067                    .where_or
1068                    .push(format!("{join_table}.{field} is not {value}"));
1069            }
1070            "in" => {
1071                if value.is_array() && value.is_empty() {
1072                    self.params.where_or.push("1=0".to_string());
1073                    return self;
1074                }
1075                let mut text = String::new();
1076                if value.is_array() {
1077                    for item in value.members() {
1078                        text = format!("{text},'{item}'");
1079                    }
1080                } else {
1081                    let value = value.as_str().unwrap_or("");
1082                    let value: Vec<&str> = value.split(",").collect();
1083                    for item in value.iter() {
1084                        text = format!("{text},'{item}'");
1085                    }
1086                }
1087                text = text.trim_start_matches(",").into();
1088                self.params
1089                    .where_or
1090                    .push(format!("{join_table}.{field} {compare} ({text})"));
1091            }
1092            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
1093            // 用法:.where_or("tags", "json_contains", "紧急".into())
1094            //       .where_or("tags", "json_contains", json::array!["紧急", "重要"])
1095            "json_contains" => {
1096                if value.is_array() {
1097                    if value.is_empty() {
1098                        self.params.where_or.push("1=0".to_string());
1099                    } else {
1100                        let mut parts = vec![];
1101                        for item in value.members() {
1102                            let escaped = super::sql_safety::escape_string(&item.to_string());
1103                            parts.push(format!(
1104                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1105                                escaped
1106                            ));
1107                        }
1108                        self.params
1109                            .where_or
1110                            .push(format!("({})", parts.join(" OR ")));
1111                    }
1112                } else {
1113                    let escaped = super::sql_safety::escape_string(&value.to_string());
1114                    self.params.where_or.push(format!(
1115                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1116                        escaped
1117                    ));
1118                }
1119            }
1120            _ => {
1121                self.params
1122                    .where_or
1123                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1124            }
1125        }
1126        self
1127    }
1128
1129    fn where_raw(&mut self, expr: &str) -> &mut Self {
1130        self.params.where_and.push(expr.to_string());
1131        self
1132    }
1133
1134    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1135        self.params
1136            .where_and
1137            .push(format!("\"{field}\" IN ({sub_sql})"));
1138        self
1139    }
1140
1141    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1142        self.params
1143            .where_and
1144            .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1145        self
1146    }
1147
1148    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1149        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1150        self
1151    }
1152
1153    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1154        self.params
1155            .where_and
1156            .push(format!("NOT EXISTS ({sub_sql})"));
1157        self
1158    }
1159
1160    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1161        self.params.where_column = format!(
1162            "{}.{} {} {}.{}",
1163            self.params.table, field_a, compare, self.params.table, field_b
1164        );
1165        self
1166    }
1167
1168    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1169        self.params
1170            .update_column
1171            .push(format!("{field_a} = {compare}"));
1172        self
1173    }
1174
1175    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1176        self.params.page = page;
1177        self.params.limit = limit;
1178        self
1179    }
1180
1181    fn limit(&mut self, count: i32) -> &mut Self {
1182        self.params.limit_only = count;
1183        self
1184    }
1185
1186    fn column(&mut self, field: &str) -> JsonValue {
1187        self.field(field);
1188        let sql = self.params.select_sql();
1189
1190        if self.params.sql {
1191            return JsonValue::from(sql);
1192        }
1193        let (state, data) = self.query(sql.as_str());
1194        match state {
1195            true => {
1196                let mut list = array![];
1197                for item in data.members() {
1198                    if self.params.json[field].is_empty() {
1199                        let _ = list.push(item[field].clone());
1200                    } else {
1201                        let data =
1202                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1203                        let _ = list.push(data);
1204                    }
1205                }
1206                list
1207            }
1208            false => {
1209                array![]
1210            }
1211        }
1212    }
1213
1214    fn count(&mut self) -> JsonValue {
1215        self.params.fields = json::object! {};
1216        self.params.fields["count"] = "count(*) as count".to_string().into();
1217        let sql = self.params.select_sql();
1218        if self.params.sql {
1219            return JsonValue::from(sql.clone());
1220        }
1221        let (state, data) = self.query(sql.as_str());
1222        if state {
1223            data[0]["count"].clone()
1224        } else {
1225            JsonValue::from(0)
1226        }
1227    }
1228
1229    fn max(&mut self, field: &str) -> JsonValue {
1230        self.params.fields[field] = format!("max({field}) as {field}").into();
1231        let sql = self.params.select_sql();
1232        if self.params.sql {
1233            return JsonValue::from(sql.clone());
1234        }
1235        let (state, data) = self.query(sql.as_str());
1236        if state {
1237            if data.len() > 1 {
1238                return data.clone();
1239            }
1240            data[0][field].clone()
1241        } else {
1242            JsonValue::from(0)
1243        }
1244    }
1245
1246    fn min(&mut self, field: &str) -> JsonValue {
1247        self.params.fields[field] = format!("min({field}) as {field}").into();
1248        let sql = self.params.select_sql();
1249        if self.params.sql {
1250            return JsonValue::from(sql.clone());
1251        }
1252        let (state, data) = self.query(sql.as_str());
1253        if state {
1254            if data.len() > 1 {
1255                return data;
1256            }
1257            data[0][field].clone()
1258        } else {
1259            JsonValue::from(0)
1260        }
1261    }
1262
1263    fn sum(&mut self, field: &str) -> JsonValue {
1264        self.params.fields[field] = format!("sum({field}) as {field}").into();
1265        let sql = self.params.select_sql();
1266        if self.params.sql {
1267            return JsonValue::from(sql.clone());
1268        }
1269        let (state, data) = self.query(sql.as_str());
1270        match state {
1271            true => {
1272                if data.len() > 1 {
1273                    return data;
1274                }
1275                data[0][field].clone()
1276            }
1277            false => JsonValue::from(0),
1278        }
1279    }
1280
1281    fn avg(&mut self, field: &str) -> JsonValue {
1282        self.params.fields[field] = format!("avg({field}) as {field}").into();
1283        let sql = self.params.select_sql();
1284        if self.params.sql {
1285            return JsonValue::from(sql.clone());
1286        }
1287        let (state, data) = self.query(sql.as_str());
1288        if state {
1289            if data.len() > 1 {
1290                return data;
1291            }
1292            data[0][field].clone()
1293        } else {
1294            JsonValue::from(0)
1295        }
1296    }
1297
1298    fn having(&mut self, expr: &str) -> &mut Self {
1299        self.params.having.push(expr.to_string());
1300        self
1301    }
1302
1303    fn select(&mut self) -> JsonValue {
1304        let sql = self.params.select_sql();
1305        if self.params.sql {
1306            return JsonValue::from(sql.clone());
1307        }
1308        let (state, mut data) = self.query(sql.as_str());
1309        match state {
1310            true => {
1311                for (field, _) in self.params.json.entries() {
1312                    for item in data.members_mut() {
1313                        if !item[field].is_empty() {
1314                            let json = item[field].to_string();
1315                            item[field] = match json::parse(&json) {
1316                                Ok(e) => e,
1317                                Err(_) => JsonValue::from(json),
1318                            };
1319                        }
1320                    }
1321                }
1322                data.clone()
1323            }
1324            false => array![],
1325        }
1326    }
1327
1328    fn find(&mut self) -> JsonValue {
1329        self.params.page = 1;
1330        self.params.limit = 1;
1331        let sql = self.params.select_sql();
1332        if self.params.sql {
1333            return JsonValue::from(sql.clone());
1334        }
1335        let (state, mut data) = self.query(sql.as_str());
1336        match state {
1337            true => {
1338                if data.is_empty() {
1339                    return object! {};
1340                }
1341                for (field, _) in self.params.json.entries() {
1342                    if !data[0][field].is_empty() {
1343                        let json = data[0][field].to_string();
1344                        let json = json::parse(&json).unwrap_or(array![]);
1345                        data[0][field] = json;
1346                    } else {
1347                        data[0][field] = array![];
1348                    }
1349                }
1350                data[0].clone()
1351            }
1352            false => {
1353                object! {}
1354            }
1355        }
1356    }
1357
1358    fn value(&mut self, field: &str) -> JsonValue {
1359        self.params.fields = object! {};
1360        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1361        self.params.page = 1;
1362        self.params.limit = 1;
1363        let sql = self.params.select_sql();
1364        if self.params.sql {
1365            return JsonValue::from(sql.clone());
1366        }
1367        let (state, mut data) = self.query(sql.as_str());
1368        match state {
1369            true => {
1370                for (field, _) in self.params.json.entries() {
1371                    if !data[0][field].is_empty() {
1372                        let json = data[0][field].to_string();
1373                        let json = json::parse(&json).unwrap_or(array![]);
1374                        data[0][field] = json;
1375                    } else {
1376                        data[0][field] = array![];
1377                    }
1378                }
1379                data[0][field].clone()
1380            }
1381            false => {
1382                if self.connection.debug {
1383                    info!("{data:?}");
1384                }
1385                JsonValue::Null
1386            }
1387        }
1388    }
1389
1390    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1391        let fields_list = self.table_info(&self.params.table.clone());
1392        let mut fields = vec![];
1393        let mut values = vec![];
1394        if !self.params.autoinc && data["id"].is_empty() {
1395            let thread_id = format!("{:?}", std::thread::current().id());
1396            let thread_num: u64 = thread_id
1397                .trim_start_matches("ThreadId(")
1398                .trim_end_matches(")")
1399                .parse()
1400                .unwrap_or(0);
1401            data["id"] = format!(
1402                "{:X}{:X}",
1403                Local::now().timestamp_nanos_opt().unwrap_or(0),
1404                thread_num
1405            )
1406            .into();
1407        }
1408        for (field, value) in data.entries() {
1409            fields.push(format!("\"{}\"", field));
1410
1411            if value.is_string() {
1412                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1413                continue;
1414            } else if value.is_array() {
1415                if self.params.json[field].is_empty() {
1416                    let array = value
1417                        .members()
1418                        .map(|x| x.as_str().unwrap_or(""))
1419                        .collect::<Vec<&str>>()
1420                        .join(",");
1421                    values.push(format!("'{array}'"));
1422                } else {
1423                    let json = value.to_string();
1424                    let json = json.replace("'", "''");
1425                    values.push(format!("'{json}'"));
1426                }
1427                continue;
1428            } else if value.is_object() {
1429                if self.params.json[field].is_empty() {
1430                    values.push(format!("'{value}'"));
1431                } else {
1432                    let json = value.to_string();
1433                    let json = json.replace("'", "''");
1434                    values.push(format!("'{json}'"));
1435                }
1436                continue;
1437            } else if value.is_number() {
1438                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1439                if col_type == "boolean" {
1440                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1441                    values.push(format!("{bool_val}"));
1442                } else if col_type.contains("int") {
1443                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1444                } else {
1445                    values.push(format!("{value}"));
1446                }
1447                continue;
1448            } else if value.is_boolean() || value.is_null() {
1449                values.push(format!("{value}"));
1450                continue;
1451            } else {
1452                values.push(format!("'{value}'"));
1453                continue;
1454            }
1455        }
1456        let fields = fields.join(",");
1457        let values = values.join(",");
1458
1459        let sql = format!(
1460            "INSERT INTO {} ({}) VALUES ({});",
1461            self.params.table, fields, values
1462        );
1463        if self.params.sql {
1464            return JsonValue::from(sql.clone());
1465        }
1466        let (state, ids) = self.execute(sql.as_str());
1467
1468        match state {
1469            true => match self.params.autoinc {
1470                true => ids.clone(),
1471                false => data["id"].clone(),
1472            },
1473            false => {
1474                let thread_id = format!("{:?}", thread::current().id());
1475                error!("添加失败: {thread_id} {ids:?} {sql}");
1476                JsonValue::from("")
1477            }
1478        }
1479    }
1480    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1481        let fields_list = self.table_info(&self.params.table.clone());
1482        let mut fields = String::new();
1483        if !self.params.autoinc && data[0]["id"].is_empty() {
1484            data[0]["id"] = "".into();
1485        }
1486        for (field, _) in data[0].entries() {
1487            fields = format!("{fields},\"{field}\"");
1488        }
1489        fields = fields.trim_start_matches(",").to_string();
1490
1491        let core_count = num_cpus::get();
1492        let mut p = pools::Pool::new(core_count * 4);
1493
1494        let autoinc = self.params.autoinc;
1495        for list in data.members() {
1496            let mut item = list.clone();
1497            let i = br_fields::str::Code::verification_code(3);
1498            let fields_list_new = fields_list.clone();
1499            p.execute(move |pcindex| {
1500                if !autoinc && item["id"].is_empty() {
1501                    let id = format!(
1502                        "{:X}{:X}{}",
1503                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1504                        pcindex,
1505                        i
1506                    );
1507                    item["id"] = id.into();
1508                }
1509                let mut values = "".to_string();
1510                for (field, value) in item.entries() {
1511                    if value.is_string() {
1512                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1513                    } else if value.is_number() {
1514                        let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1515                        if col_type == "boolean" {
1516                            let bool_val = value.as_i64().unwrap_or(0) != 0;
1517                            values = format!("{values},{bool_val}");
1518                        } else if col_type.contains("int") {
1519                            values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1520                        } else {
1521                            values = format!("{values},{value}");
1522                        }
1523                    } else if value.is_boolean() {
1524                        values = format!("{values},{value}");
1525                        continue;
1526                    } else {
1527                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1528                    }
1529                }
1530                values = format!("({})", values.trim_start_matches(","));
1531                array![item["id"].clone(), values]
1532            });
1533        }
1534        let (ids_list, mut values) = p.insert_all();
1535        values = values.trim_start_matches(",").to_string();
1536        let sql = format!(
1537            "INSERT INTO {} ({}) VALUES {};",
1538            self.params.table, fields, values
1539        );
1540
1541        if self.params.sql {
1542            return JsonValue::from(sql.clone());
1543        }
1544        let (state, data) = self.execute(sql.as_str());
1545        match state {
1546            true => match autoinc {
1547                true => data,
1548                false => JsonValue::from(ids_list),
1549            },
1550            false => {
1551                error!("insert_all: {data:?}");
1552                let _ = std::fs::write("pgsql_insert_all_error.log", format!("insert_all error:\n{:?}\n\nSQL:\n{}", data, sql));
1553                array![]
1554            }
1555        }
1556    }
1557    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1558        let fields_list = self.table_info(&self.params.table.clone());
1559        let mut fields = vec![];
1560        let mut values = vec![];
1561        if !self.params.autoinc && data["id"].is_empty() {
1562            let thread_id = format!("{:?}", std::thread::current().id());
1563            let thread_num: u64 = thread_id
1564                .trim_start_matches("ThreadId(")
1565                .trim_end_matches(")")
1566                .parse()
1567                .unwrap_or(0);
1568            data["id"] = format!(
1569                "{:X}{:X}",
1570                Local::now().timestamp_nanos_opt().unwrap_or(0),
1571                thread_num
1572            )
1573            .into();
1574        }
1575        for (field, value) in data.entries() {
1576            fields.push(format!("\"{}\"", field));
1577
1578            if value.is_string() {
1579                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1580                continue;
1581            } else if value.is_array() {
1582                if self.params.json[field].is_empty() {
1583                    let array = value
1584                        .members()
1585                        .map(|x| x.as_str().unwrap_or(""))
1586                        .collect::<Vec<&str>>()
1587                        .join(",");
1588                    values.push(format!("'{array}'"));
1589                } else {
1590                    let json = value.to_string();
1591                    let json = json.replace("'", "''");
1592                    values.push(format!("'{json}'"));
1593                }
1594                continue;
1595            } else if value.is_object() {
1596                if self.params.json[field].is_empty() {
1597                    values.push(format!("'{value}'"));
1598                } else {
1599                    let json = value.to_string();
1600                    let json = json.replace("'", "''");
1601                    values.push(format!("'{json}'"));
1602                }
1603                continue;
1604            } else if value.is_number() {
1605                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1606                if col_type == "boolean" {
1607                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1608                    values.push(format!("{bool_val}"));
1609                } else if col_type.contains("int") {
1610                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1611                } else {
1612                    values.push(format!("{value}"));
1613                }
1614                continue;
1615            } else if value.is_boolean() || value.is_null() {
1616                values.push(format!("{value}"));
1617                continue;
1618            } else {
1619                values.push(format!("'{value}'"));
1620                continue;
1621            }
1622        }
1623
1624        let conflict_cols: Vec<String> = conflict_fields
1625            .iter()
1626            .map(|f| format!("\"{}\"", f))
1627            .collect();
1628
1629        let update_set: Vec<String> = fields
1630            .iter()
1631            .filter(|f| {
1632                let name = f.trim_matches('"');
1633                !conflict_fields.contains(&name) && name != "id"
1634            })
1635            .map(|f| format!("{f}=EXCLUDED.{f}"))
1636            .collect();
1637
1638        let fields_str = fields.join(",");
1639        let values_str = values.join(",");
1640
1641        let sql = format!(
1642            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1643            self.params.table,
1644            fields_str,
1645            values_str,
1646            conflict_cols.join(","),
1647            update_set.join(",")
1648        );
1649        if self.params.sql {
1650            return JsonValue::from(sql.clone());
1651        }
1652        let (state, result) = self.execute(sql.as_str());
1653        match state {
1654            true => match self.params.autoinc {
1655                true => result.clone(),
1656                false => data["id"].clone(),
1657            },
1658            false => {
1659                let thread_id = format!("{:?}", thread::current().id());
1660                error!("upsert失败: {thread_id} {result:?} {sql}");
1661                JsonValue::from("")
1662            }
1663        }
1664    }
1665    fn update(&mut self, data: JsonValue) -> JsonValue {
1666        let fields_list = self.table_info(&self.params.table.clone());
1667        let mut values = vec![];
1668        for (field, value) in data.entries() {
1669            if value.is_string() {
1670                values.push(format!(
1671                    "\"{}\"='{}'",
1672                    field,
1673                    value.to_string().replace("'", "''")
1674                ));
1675            } else if value.is_number() {
1676                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1677                if col_type == "boolean" {
1678                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1679                    values.push(format!("\"{field}\"= {bool_val}"));
1680                } else if col_type.contains("int") {
1681                    values.push(format!(
1682                        "\"{}\"= {}",
1683                        field,
1684                        value.as_f64().unwrap_or(0.0) as i64
1685                    ));
1686                } else {
1687                    values.push(format!("\"{field}\"= {value}"));
1688                }
1689            } else if value.is_array() {
1690                if self.params.json[field].is_empty() {
1691                    let array = value
1692                        .members()
1693                        .map(|x| x.as_str().unwrap_or(""))
1694                        .collect::<Vec<&str>>()
1695                        .join(",");
1696                    values.push(format!("\"{field}\"='{array}'"));
1697                } else {
1698                    let json = value.to_string();
1699                    let json = json.replace("'", "''");
1700                    values.push(format!("\"{field}\"='{json}'"));
1701                }
1702                continue;
1703            } else if value.is_object() {
1704                if self.params.json[field].is_empty() {
1705                    values.push(format!("\"{field}\"='{value}'"));
1706                } else {
1707                    if value.is_empty() {
1708                        values.push(format!("\"{field}\"=''"));
1709                        continue;
1710                    }
1711                    let json = value.to_string();
1712                    let json = json.replace("'", "''");
1713                    values.push(format!("\"{field}\"='{json}'"));
1714                }
1715                continue;
1716            } else if value.is_boolean() {
1717                values.push(format!("\"{field}\"= {value}"));
1718            } else {
1719                values.push(format!("\"{field}\"=\"{value}\""));
1720            }
1721        }
1722
1723        for (field, value) in self.params.inc_dec.entries() {
1724            values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1725        }
1726        if !self.params.update_column.is_empty() {
1727            values.extend(self.params.update_column.clone());
1728        }
1729        let values = values.join(",");
1730
1731        let sql = format!(
1732            "UPDATE {} SET {} {};",
1733            self.params.table.clone(),
1734            values,
1735            self.params.where_sql()
1736        );
1737        if self.params.sql {
1738            return JsonValue::from(sql.clone());
1739        }
1740        let (state, data) = self.execute(sql.as_str());
1741        if state {
1742            data
1743        } else {
1744            let thread_id = format!("{:?}", thread::current().id());
1745            error!("update: {thread_id} {data:?} {sql}");
1746            0.into()
1747        }
1748    }
1749    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1750        let fields_list = self.table_info(&self.params.table.clone());
1751        let mut values = vec![];
1752
1753        let mut ids = vec![];
1754        for (field, _) in data[0].entries() {
1755            if field == "id" {
1756                continue;
1757            }
1758            let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1759            let mut fields = vec![];
1760            for row in data.members() {
1761                let value = row[field].clone();
1762                let id = row["id"].clone();
1763                ids.push(id.clone());
1764                if value.is_string() {
1765                    fields.push(format!(
1766                        "WHEN '{}' THEN '{}'",
1767                        id,
1768                        value.to_string().replace("'", "''")
1769                    ));
1770                } else if value.is_array() || value.is_object() {
1771                    if self.params.json[field].is_empty() {
1772                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1773                    } else {
1774                        let json = value.to_string();
1775                        let json = json.replace("'", "''");
1776                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1777                    }
1778                    continue;
1779                } else if value.is_number() {
1780                    if col_type == "boolean" {
1781                        let bool_val = value.as_i64().unwrap_or(0) != 0;
1782                        fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1783                    } else {
1784                        fields.push(format!("WHEN '{id}' THEN {value}"));
1785                    }
1786                } else if value.is_boolean() || value.is_null() {
1787                    fields.push(format!("WHEN '{id}' THEN {value}"));
1788                } else {
1789                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1790                }
1791            }
1792            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1793        }
1794        self.where_and("id", "in", ids.into());
1795        for (field, value) in self.params.inc_dec.entries() {
1796            values.push(format!("{} = {}", field, value.to_string().clone()));
1797        }
1798
1799        let values = values.join(",");
1800        let sql = format!(
1801            "UPDATE {} SET {} {} {};",
1802            self.params.table.clone(),
1803            values,
1804            self.params.where_sql(),
1805            self.params.page_limit_sql()
1806        );
1807        if self.params.sql {
1808            return JsonValue::from(sql.clone());
1809        }
1810        let (state, data) = self.execute(sql.as_str());
1811        if state {
1812            data
1813        } else {
1814            error!("update_all: {data:?}");
1815            JsonValue::from(0)
1816        }
1817    }
1818
1819    fn delete(&mut self) -> JsonValue {
1820        let sql = format!(
1821            "delete FROM {} {} {};",
1822            self.params.table.clone(),
1823            self.params.where_sql(),
1824            self.params.page_limit_sql()
1825        );
1826        if self.params.sql {
1827            return JsonValue::from(sql.clone());
1828        }
1829        let (state, data) = self.execute(sql.as_str());
1830        match state {
1831            true => data,
1832            false => {
1833                error!("delete 失败>>> {data:?}");
1834                JsonValue::from(0)
1835            }
1836        }
1837    }
1838
1839    fn transaction(&mut self) -> bool {
1840        let thread_id = format!("{:?}", thread::current().id());
1841        let key = format!("{}{}", self.default, thread_id);
1842
1843        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1844            let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1845            PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1846            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1847            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1848            return true;
1849        }
1850
1851        // 获取事务连接,失败时重试2次(短退避)
1852        let mut conn = None;
1853        for attempt in 0..3u8 {
1854            match self.client.get_connect_for_transaction() {
1855                Ok(mut c) => {
1856                    if c.is_valid() {
1857                        conn = Some(c);
1858                        break;
1859                    }
1860                    warn!("事务连接无效(第{}次)", attempt + 1);
1861                    self.client.release_transaction_conn();
1862                }
1863                Err(e) => {
1864                    warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1865                }
1866            }
1867            if attempt < 2 {
1868                thread::sleep(std::time::Duration::from_millis(200));
1869            }
1870        }
1871        let mut conn = match conn {
1872            Some(c) => c,
1873            None => {
1874                error!("获取事务连接重试耗尽");
1875                return false;
1876            }
1877        };
1878
1879        if let Err(e) = conn.execute("START TRANSACTION") {
1880            error!("启动事务失败: {e}");
1881            self.client.release_transaction_conn();
1882            return false;
1883        }
1884
1885
1886        PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1887        true
1888    }
1889    fn commit(&mut self) -> bool {
1890        let thread_id = format!("{:?}", thread::current().id());
1891        let key = format!("{}{}", self.default, thread_id);
1892
1893        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1894            error!("commit: 没有活跃的事务");
1895            return false;
1896        }
1897
1898        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1899        if depth > 1 {
1900            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1901            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1902            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1903            return true;
1904        }
1905
1906        let commit_result =
1907            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1908
1909        let success = match commit_result {
1910            Some(Ok(_)) => true,
1911            Some(Err(e)) => {
1912                error!("提交事务失败: {e}");
1913                false
1914            }
1915            None => {
1916                error!("提交事务失败: 未找到连接");
1917                false
1918            }
1919        };
1920
1921        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1922        self.client.release_transaction_conn();
1923        success
1924    }
1925
1926    fn rollback(&mut self) -> bool {
1927        let thread_id = format!("{:?}", thread::current().id());
1928        let key = format!("{}{}", self.default, thread_id);
1929
1930        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1931            error!("rollback: 没有活跃的事务");
1932            return false;
1933        }
1934
1935        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1936        if depth > 1 {
1937            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1938            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1939            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1940            return true;
1941        }
1942
1943        let rollback_result =
1944            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1945
1946        let success = match rollback_result {
1947            Some(Ok(_)) => true,
1948            Some(Err(e)) => {
1949                error!("回滚失败: {e}");
1950                false
1951            }
1952            None => {
1953                error!("回滚失败: 未找到连接");
1954                false
1955            }
1956        };
1957
1958        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1959        self.client.release_transaction_conn();
1960        success
1961    }
1962
1963    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1964        let (state, data) = self.query(sql);
1965        match state {
1966            true => Ok(data),
1967            false => Err(data.to_string()),
1968        }
1969    }
1970
1971    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1972        let (state, data) = self.execute(sql);
1973        match state {
1974            true => Ok(data),
1975            false => Err(data.to_string()),
1976        }
1977    }
1978
1979    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1980        self.params.inc_dec[field] = format!("{field} + {num}").into();
1981        self
1982    }
1983
1984    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1985        self.params.inc_dec[field] = format!("{field} - {num}").into();
1986        self
1987    }
1988    fn buildsql(&mut self) -> String {
1989        self.fetch_sql();
1990        let sql = self.select().to_string();
1991        format!("( {} ) {}", sql, self.params.table)
1992    }
1993
1994    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1995        for field in fields {
1996            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1997        }
1998        self
1999    }
2000
2001    fn join(
2002        &mut self,
2003        main_table: &str,
2004        main_fields: &str,
2005        right_table: &str,
2006        right_fields: &str,
2007    ) -> &mut Self {
2008        let main_table = if main_table.is_empty() {
2009            self.params.table.clone()
2010        } else {
2011            main_table.to_string()
2012        };
2013        self.params.join_table = right_table.to_string();
2014        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2015        self
2016    }
2017
2018    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2019        let main_fields = if main_fields.is_empty() {
2020            "id"
2021        } else {
2022            main_fields
2023        };
2024        let second_fields = if second_fields.is_empty() {
2025            self.params.table.clone()
2026        } else {
2027            second_fields.to_string().clone()
2028        };
2029        let sec_table_name = format!("{}{}", table, "_2");
2030        let second_table = format!("{} {}", table, sec_table_name.clone());
2031        self.params.join_table = sec_table_name.clone();
2032        self.params.join.push(format!(
2033            " INNER JOIN {} ON {}.{} = {}.{}",
2034            second_table, self.params.table, main_fields, sec_table_name, second_fields
2035        ));
2036        self
2037    }
2038
2039    fn join_right(
2040        &mut self,
2041        main_table: &str,
2042        main_fields: &str,
2043        right_table: &str,
2044        right_fields: &str,
2045    ) -> &mut Self {
2046        let main_table = if main_table.is_empty() {
2047            self.params.table.clone()
2048        } else {
2049            main_table.to_string()
2050        };
2051        self.params.join_table = right_table.to_string();
2052        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2053        self
2054    }
2055
2056    fn join_full(
2057        &mut self,
2058        main_table: &str,
2059        main_fields: &str,
2060        right_table: &str,
2061        right_fields: &str,
2062    ) -> &mut Self {
2063        let main_table = if main_table.is_empty() {
2064            self.params.table.clone()
2065        } else {
2066            main_table.to_string()
2067        };
2068        self.params.join_table = right_table.to_string();
2069        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2070        self
2071    }
2072
2073    fn union(&mut self, sub_sql: &str) -> &mut Self {
2074        self.params.unions.push(format!("UNION {sub_sql}"));
2075        self
2076    }
2077
2078    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2079        self.params.unions.push(format!("UNION ALL {sub_sql}"));
2080        self
2081    }
2082
2083    fn lock_for_update(&mut self) -> &mut Self {
2084        self.params.lock_mode = "FOR UPDATE".to_string();
2085        self
2086    }
2087
2088    fn lock_for_share(&mut self) -> &mut Self {
2089        self.params.lock_mode = "FOR SHARE".to_string();
2090        self
2091    }
2092}