Skip to main content

br_db/types/
pgsql.rs

1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use br_pgsql::PgsqlError;
7use chrono::Local;
8use json::{array, object, JsonValue};
9use log::{error, info, warn};
10use std::thread;
11#[derive(Clone)]
12pub struct Pgsql {
13    /// 当前连接配置
14    pub connection: Connection,
15    /// 当前选中配置
16    pub default: String,
17    pub params: Params,
18    pub client: Pools,
19}
20
21impl Pgsql {
22    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
23        let port = connection
24            .hostport
25            .parse::<i32>()
26            .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
27
28        let cp_connection = connection.clone();
29        let config = object! {
30            debug: cp_connection.debug,
31            username: cp_connection.username,
32            userpass: cp_connection.userpass,
33            database: cp_connection.database,
34            hostname: cp_connection.hostname,
35            hostport: port,
36            charset: cp_connection.charset.str(),
37        };
38        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
39
40        let pools = pgsql.pools()?;
41        Ok(Self {
42            connection,
43            default: default.clone(),
44            params: Params::default("pgsql"),
45            client: pools,
46        })
47    }
48
49    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
50        let thread_id = format!("{:?}", thread::current().id());
51        let key = format!("{}{}", self.default, thread_id);
52
53        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
54            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
55
56            match result {
57                Some(Ok(e)) => {
58                    if self.connection.debug {
59                        info!("查询成功: {} {}", thread_id.clone(), sql);
60                    }
61                    (true, e.rows)
62                }
63                Some(Err(e)) => {
64                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
65                    (false, JsonValue::from(e.to_string()))
66                }
67                None => {
68                    error!("事务查询失败: 未找到事务连接 {thread_id}");
69                    (false, JsonValue::from("未找到事务连接"))
70                }
71            }
72        } else {
73            const RETRY_BACKOFF_MS: [u64; 3] = [200, 500, 1000];
74            let mut last_err = String::new();
75            for attempt in 0..3u8 {
76                let mut guard = match self.client.get_guard() {
77                    Ok(g) => g,
78                    Err(e) => {
79                        if attempt == 2 {
80                            error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
81                            return (false, JsonValue::from(e.to_string()));
82                        }
83                        let backoff = RETRY_BACKOFF_MS[attempt as usize];
84                        warn!(
85                            "非事务查询 get_guard 失败(第{}次, {}ms后重试): {thread_id} {e}",
86                            attempt + 1,
87                            backoff
88                        );
89                        thread::sleep(std::time::Duration::from_millis(backoff));
90                        continue;
91                    }
92                };
93                match guard.conn().query(sql) {
94                    Ok(e) => {
95                        if self.connection.debug {
96                            info!("查询成功: {} {}", thread_id.clone(), sql);
97                        }
98                        return (true, e.rows);
99                    }
100                    Err(ref e) if Self::is_retriable_error(e) => {
101                        last_err = e.to_string();
102                        guard.discard();
103                        if attempt < 2 {
104                            let backoff = RETRY_BACKOFF_MS[attempt as usize];
105                            warn!(
106                                "非事务查询连接断开(第{}次, {}ms后重试): {thread_id} {e}",
107                                attempt + 1,
108                                backoff
109                            );
110                            thread::sleep(std::time::Duration::from_millis(backoff));
111                        } else {
112                            warn!(
113                                "非事务查询连接断开(第{}次, 不再重试): {thread_id} {e}",
114                                attempt + 1
115                            );
116                        }
117                    }
118                    Err(e) => {
119                        error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
120                        return (false, JsonValue::from(e.to_string()));
121                    }
122                }
123            }
124            error!("非事务查询重试耗尽: 线程ID: {thread_id} 错误: {last_err} SQL语句: [{sql}]");
125            (false, JsonValue::from(last_err))
126        }
127    }
128    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
129        let thread_id = format!("{:?}", thread::current().id());
130        let key = format!("{}{}", self.default, thread_id);
131
132        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
133            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
134
135            match result {
136                Some(Ok(e)) => {
137                    if self.connection.debug {
138                        info!("提交成功: {} {}", thread_id.clone(), sql);
139                    }
140                    if sql.contains("INSERT") {
141                        (true, e.rows)
142                    } else {
143                        (true, e.affect_count.into())
144                    }
145                }
146                Some(Err(e)) => {
147                    error!("事务提交失败: {thread_id} {e}");
148                    (false, JsonValue::from(e.to_string()))
149                }
150                None => {
151                    error!("事务执行失败: 未找到事务连接 {thread_id}");
152                    (false, JsonValue::from("未找到事务连接"))
153                }
154            }
155        } else {
156            // 注意: execute 重试仅适用于幂等操作(DDL、幂等UPDATE等)。
157            // 非幂等写入(如INSERT无唯一约束)在事务外重试可能导致重复数据。
158            // 当前设计:事务内写入不走此路径(上方 if 分支处理),非事务写入由调用方保证幂等性。
159            const RETRY_BACKOFF_MS: [u64; 3] = [200, 500, 1000];
160            let mut last_err = String::new();
161            for attempt in 0..3u8 {
162                let mut guard = match self.client.get_guard() {
163                    Ok(g) => g,
164                    Err(e) => {
165                        if attempt == 2 {
166                            error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
167                            return (false, JsonValue::from(e.to_string()));
168                        }
169                        let backoff = RETRY_BACKOFF_MS[attempt as usize];
170                        warn!(
171                            "非事务执行 get_guard 失败(第{}次, {}ms后重试): {thread_id} {e}",
172                            attempt + 1,
173                            backoff
174                        );
175                        thread::sleep(std::time::Duration::from_millis(backoff));
176                        continue;
177                    }
178                };
179                match guard.conn().execute(sql) {
180                    Ok(e) => {
181                        if self.connection.debug {
182                            info!("提交成功: {} {}", thread_id.clone(), sql);
183                        }
184                        if sql.contains("INSERT") {
185                            return (true, e.rows);
186                        } else {
187                            return (true, e.affect_count.into());
188                        }
189                    }
190                    Err(ref e) if Self::is_retriable_error(e) => {
191                        last_err = e.to_string();
192                        guard.discard();
193                        if attempt < 2 {
194                            let backoff = RETRY_BACKOFF_MS[attempt as usize];
195                            warn!(
196                                "非事务执行连接断开(第{}次, {}ms后重试): {thread_id} {e}",
197                                attempt + 1,
198                                backoff
199                            );
200                            thread::sleep(std::time::Duration::from_millis(backoff));
201                        } else {
202                            warn!(
203                                "非事务执行连接断开(第{}次, 不再重试): {thread_id} {e}",
204                                attempt + 1
205                            );
206                        }
207                    }
208                    Err(e) => {
209                        error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
210                        return (false, JsonValue::from(e.to_string()));
211                    }
212                }
213            }
214            error!("非事务执行重试耗尽: 线程ID: {thread_id} 错误: {last_err} SQL语句: [{sql}]");
215            (false, JsonValue::from(last_err))
216        }
217    }
218
219    /// 判断是否为可重试的连接类错误(连接断开、IO错误、超时)
220    fn is_retriable_error(e: &PgsqlError) -> bool {
221        matches!(
222            e,
223            PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
224        )
225    }
226}
227
228impl DbMode for Pgsql {
229    fn database_tables(&mut self) -> JsonValue {
230        let sql = "SHOW TABLES".to_string();
231        match self.sql(sql.as_str()) {
232            Ok(e) => {
233                let mut list = vec![];
234                for item in e.members() {
235                    for (_, value) in item.entries() {
236                        list.push(value.clone());
237                    }
238                }
239                list.into()
240            }
241            Err(_) => {
242                array![]
243            }
244        }
245    }
246
247    fn database_create(&mut self, name: &str) -> bool {
248        let sql = format!("CREATE DATABASE {name}");
249
250        let (state, data) = self.execute(sql.as_str());
251        match state {
252            true => data.as_bool().unwrap_or(true),
253            false => {
254                error!("创建数据库失败: {data:?}");
255                false
256            }
257        }
258    }
259
260    fn truncate(&mut self, table: &str) -> bool {
261        let sql = format!("TRUNCATE TABLE {table}");
262        let (state, _) = self.execute(sql.as_str());
263        state
264    }
265}
266
267impl Mode for Pgsql {
268    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
269        let mut sql = String::new();
270        let mut comments = vec![];
271
272        if !options.table_unique.is_empty() {
273            let full_name = format!(
274                "{}_unique_{}",
275                options.table_name,
276                options.table_unique.join("_")
277            );
278            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
279            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
280            let unique = format!(
281                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
282                name,
283                options.table_name,
284                options.table_unique.join(",")
285            );
286            comments.push(unique);
287        }
288
289        for row in options.table_index.iter() {
290            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
291            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
292            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
293            let index = format!(
294                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
295                name,
296                options.table_name,
297                row.join(",")
298            );
299            comments.push(index);
300        }
301
302        for (name, field) in options.table_fields.entries_mut() {
303            field["table_name"] = options.table_name.clone().into();
304            let row = br_fields::field("pgsql", name, field.clone());
305            let (col_sql, meta) = if let Some(idx) = row.find("--") {
306                (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
307            } else {
308                (row.trim(), None)
309            };
310            if let Some(meta) = meta {
311                comments.push(format!(
312                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
313                    options.table_name, name, meta
314                ));
315            }
316            sql = format!("{} {},\r\n", sql, col_sql);
317        }
318
319        let primary_key = format!(
320            "CONSTRAINT {}_{} PRIMARY KEY ({})",
321            options.table_name, options.table_key, options.table_key
322        );
323        let sql = format!(
324            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
325            options.table_name,
326            sql.trim_end_matches(",\r\n"),
327            primary_key
328        );
329        comments.insert(0, sql);
330
331        for (_name, field) in options.table_fields.entries() {
332            let _ = field["mode"].as_str();
333        }
334
335        if self.params.sql {
336            let info = comments.join("\r\n");
337            return JsonValue::from(info);
338        }
339        for comment in comments {
340            let (state, _) = self.execute(comment.as_str());
341            match state {
342                true => {}
343                false => {
344                    return JsonValue::from(state);
345                }
346            }
347        }
348        JsonValue::from(true)
349    }
350
351    fn table_update(&mut self, options: TableOptions) -> JsonValue {
352        let fields_list = self.table_info(&options.table_name);
353        let mut put = vec![];
354        let mut add = vec![];
355        let mut del = vec![];
356        let mut comments = vec![];
357
358        for (key, _) in fields_list.entries() {
359            if options.table_fields[key].is_empty() {
360                del.push(key);
361            }
362        }
363        for (name, field) in options.table_fields.entries() {
364            if !fields_list[name].is_empty() {
365                let old_info = &fields_list[name];
366                let new_field_sql = br_fields::field("pgsql", name, field.clone());
367
368                let old_comment = old_info["comment"].as_str().unwrap_or("");
369                let old_type = old_info["type"].as_str().unwrap_or("");
370
371                let new_comment = if let Some(idx) = new_field_sql.find("--") {
372                    new_field_sql[idx + 2..].trim()
373                } else {
374                    ""
375                };
376
377                let comment_matches =
378                    if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
379                        let old_parts: Vec<&str> = old_comment.split('|').collect();
380                        let new_parts: Vec<&str> = new_comment.split('|').collect();
381                        if old_parts.len() >= 4 && new_parts.len() >= 4 {
382                            old_parts[..4] == new_parts[..4]
383                        } else {
384                            old_comment == new_comment
385                        }
386                    } else if !old_comment.is_empty() && !new_comment.is_empty() {
387                        let old_parts: Vec<&str> = old_comment.split('|').collect();
388                        let new_parts: Vec<&str> = new_comment.split('|').collect();
389                        if old_parts.len() >= 2
390                            && new_parts.len() >= 2
391                            && old_parts.len() == new_parts.len()
392                        {
393                            let old_filtered: Vec<&str> = old_parts
394                                .iter()
395                                .filter(|v| **v != "true" && **v != "false")
396                                .copied()
397                                .collect();
398                            let new_filtered: Vec<&str> = new_parts
399                                .iter()
400                                .filter(|v| **v != "true" && **v != "false")
401                                .copied()
402                                .collect();
403                            old_filtered == new_filtered
404                        } else {
405                            old_comment == new_comment
406                        }
407                    } else {
408                        old_comment == new_comment
409                    };
410
411                let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
412                let new_type = if sql_parts.len() > 1 {
413                    sql_parts[1].to_lowercase()
414                } else {
415                    String::new()
416                };
417
418                let type_matches = match old_type {
419                    "integer" => {
420                        new_type.contains("int")
421                            && !new_type.contains("bigint")
422                            && !new_type.contains("smallint")
423                    }
424                    "bigint" => new_type.contains("bigint"),
425                    "smallint" => new_type.contains("smallint"),
426                    "boolean" => new_type.contains("boolean"),
427                    "text" => new_type.contains("text"),
428                    "character varying" => {
429                        if !new_type.contains("varchar") {
430                            false
431                        } else {
432                            let old_len = old_info["max_length"].as_i64().unwrap_or(0);
433                            let new_len = new_type
434                                .trim_start_matches("varchar(")
435                                .trim_end_matches(')')
436                                .parse::<i64>()
437                                .unwrap_or(0);
438                            let matched = old_len == new_len || new_len == 0;
439                            if !matched {
440                                log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
441                            }
442                            old_len == new_len || new_len == 0
443                        }
444                    }
445                    "character" => new_type.contains("char") && !new_type.contains("varchar"),
446                    "numeric" => {
447                        if !(new_type.contains("numeric") || new_type.contains("decimal")) {
448                            false
449                        } else {
450                            let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
451                            let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
452                            let inner = new_type
453                                .replace("numeric(", "")
454                                .replace("decimal(", "")
455                                .replace(')', "");
456                            let parts: Vec<&str> = inner.split(',').collect();
457                            let new_prec = parts.first().and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
458                            let new_scale = parts.get(1).and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
459                            old_prec == new_prec && old_scale == new_scale
460                        }
461                    }
462                    "double precision" => {
463                        new_type.contains("double") || new_type.contains("float8")
464                    }
465                    "real" => new_type.contains("real") || new_type.contains("float4"),
466                    "timestamp without time zone" | "timestamp with time zone" => {
467                        new_type.contains("timestamp")
468                    }
469                    "date" => new_type.contains("date") && !new_type.contains("timestamp"),
470                    "time without time zone" | "time with time zone" => {
471                        new_type.contains("time") && !new_type.contains("timestamp")
472                    }
473                    "json" | "jsonb" => new_type.contains("json"),
474                    "uuid" => new_type.contains("uuid"),
475                    "bytea" => new_type.contains("bytea"),
476                    _ => old_type == new_type,
477                };
478
479                if type_matches && comment_matches {
480                    continue;
481                }
482
483                log::debug!(
484                    "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
485                    options.table_name,
486                    name,
487                    type_matches,
488                    old_type,
489                    new_type,
490                    comment_matches
491                );
492                put.push(name);
493            } else {
494                add.push(name);
495            }
496        }
497
498        for name in add.iter() {
499            let name = name.to_string();
500            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
501            let rows = row.split("--").collect::<Vec<&str>>();
502            comments.push(format!(
503                r#"ALTER TABLE "{}" add {};"#,
504                options.table_name,
505                rows[0].trim()
506            ));
507            if rows.len() > 1 {
508                comments.push(format!(
509                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
510                    options.table_name,
511                    name,
512                    rows[1].trim()
513                ));
514            }
515        }
516        for name in del.iter() {
517            comments.push(format!(
518                "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
519                options.table_name, name
520            ));
521        }
522        for name in put.iter() {
523            let name = name.to_string();
524            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
525            let rows = row.split("--").collect::<Vec<&str>>();
526
527            let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
528
529            if sql[1].contains("BOOLEAN") {
530                let text = format!(
531                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
532                    options.table_name, name
533                );
534                comments.push(text.clone());
535                let text = format!(
536                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
537                    options.table_name, name, sql[1]
538                );
539                comments.push(text.clone());
540            } else {
541                let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
542                let new_type_lower = sql[1].to_lowercase();
543                let is_date_to_numeric = (old_col_type == "date"
544                    || old_col_type.contains("timestamp"))
545                    && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
546                if is_date_to_numeric {
547                    comments.push(format!(
548                        "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
549                        options.table_name, name
550                    ));
551                    comments.push(format!(
552                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
553                        options.table_name, name, sql[1], name, name, name
554                    ));
555                } else {
556                    let text = format!(
557                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
558                        options.table_name, name, sql[1]
559                    );
560                    comments.push(text.clone());
561                }
562            };
563
564            if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
565                let default_value = rows[0][default_pos + 9..].trim();
566                if !default_value.is_empty() {
567                    comments.push(format!(
568                        "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
569                        options.table_name, name, default_value
570                    ));
571                }
572            }
573            // PostgreSQL 不使用 NOT NULL 约束,依赖默认值
574            // 如果现有字段有 NOT NULL 约束,移除它
575            let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
576                .as_str()
577                .unwrap_or("YES");
578            let old_is_required = old_is_nullable == "NO";
579
580            // 跳过主键字段,主键隐含 NOT NULL 约束
581            if old_is_required && name != options.table_key {
582                comments.push(format!(
583                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
584                    options.table_name, name
585                ));
586            }
587
588            if rows.len() > 1 {
589                comments.push(format!(
590                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
591                    options.table_name,
592                    name,
593                    rows[1].trim()
594                ));
595            }
596        }
597
598        let mut unique_new = vec![];
599        let mut index_new = vec![];
600        let mut primary_key = vec![];
601        let (_, index_list) = self.query(
602            format!(
603                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
604                options.table_name
605            )
606            .as_str(),
607        );
608        for item in index_list.members() {
609            let key_name = item["indexname"].as_str().unwrap_or("");
610            let indexdef = item["indexdef"].to_string();
611
612            if indexdef.contains(
613                format!(
614                    "CREATE UNIQUE INDEX {}_{} ON",
615                    options.table_name, options.table_key
616                )
617                .as_str(),
618            ) {
619                primary_key.push(key_name.to_string());
620                continue;
621            }
622            if indexdef.contains("CREATE UNIQUE INDEX") {
623                unique_new.push(key_name.to_string());
624                continue;
625            }
626            if indexdef.contains("CREATE INDEX") {
627                index_new.push(key_name.to_string());
628                continue;
629            }
630        }
631
632        if !options.table_unique.is_empty() {
633            let full_name = format!(
634                "{}_unique_{}",
635                options.table_name,
636                options.table_unique.join("_")
637            );
638            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
639            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
640            let unique = format!(
641                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
642                name,
643                options.table_name,
644                options.table_unique.join(",")
645            );
646            if !unique_new.contains(&name) {
647                comments.push(unique);
648            }
649            unique_new.retain(|x| *x != name);
650        }
651
652        for row in options.table_index.iter() {
653            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
654            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
655            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
656            let index = format!(
657                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
658                name,
659                options.table_name,
660                row.join(",")
661            );
662            if !index_new.contains(&name) {
663                comments.push(index);
664            }
665            index_new.retain(|x| *x != name);
666        }
667
668        for item in unique_new {
669            if item.ends_with("_pkey") {
670                continue;
671            }
672            if item.starts_with("unique_") {
673                comments.push(format!(
674                    "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
675                    options.table_name,
676                    item.clone()
677                ));
678            } else {
679                comments.push(format!("DROP INDEX {};\r\n", item.clone()));
680            }
681        }
682        for item in index_new {
683            if item.ends_with("_pkey") {
684                continue;
685            }
686            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
687        }
688
689        if self.params.sql {
690            return JsonValue::from(comments.join(""));
691        }
692
693        if comments.is_empty() {
694            return JsonValue::from(-1);
695        }
696
697        for item in comments.iter() {
698            let (state, res) = self.execute(item.as_str());
699            match state {
700                true => {}
701                false => {
702                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
703                    return JsonValue::from(0);
704                }
705            }
706        }
707        JsonValue::from(1)
708    }
709
710    fn table_info(&mut self, table: &str) -> JsonValue {
711        let sql = format!(
712            "SELECT  COL.COLUMN_NAME,
713    COL.DATA_TYPE,
714    COL.IS_NULLABLE,
715    COL.CHARACTER_MAXIMUM_LENGTH,
716    COL.NUMERIC_PRECISION,
717    COL.NUMERIC_SCALE,
718    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
719    LEFT JOIN
720    pg_catalog.pg_description DESCRIPTION
721    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
722    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
723        let (state, data) = self.query(sql.as_str());
724        let mut list = object! {};
725        if state {
726            for item in data.members() {
727                let mut row = object! {};
728                row["field"] = item["column_name"].clone();
729                row["comment"] = item["comment"].clone();
730                row["type"] = item["data_type"].clone();
731                row["is_nullable"] = item["is_nullable"].clone();
732                row["max_length"] = item["character_maximum_length"].clone();
733                row["numeric_precision"] = item["numeric_precision"].clone();
734                row["numeric_scale"] = item["numeric_scale"].clone();
735                if let Some(field_name) = row["field"].as_str() {
736                    list[field_name] = row.clone();
737                }
738            }
739            list
740        } else {
741            list
742        }
743    }
744
745    fn table_is_exist(&mut self, name: &str) -> bool {
746        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
747        let (state, data) = self.query(sql.as_str());
748        match state {
749            true => {
750                for item in data.members() {
751                    if item.has_key("exists") {
752                        return item["exists"].as_bool().unwrap_or(false);
753                    }
754                }
755                false
756            }
757            false => false,
758        }
759    }
760
761    fn table(&mut self, name: &str) -> &mut Pgsql {
762        self.params = Params::default(self.connection.mode.str().as_str());
763        let table_name = format!("{}{}", self.connection.prefix, name);
764        if !super::sql_safety::validate_table_name(&table_name) {
765            error!("Invalid table name: {}", name);
766        }
767        self.params.table = table_name.clone();
768        self.params.join_table = table_name;
769        self
770    }
771
772    fn change_table(&mut self, name: &str) -> &mut Self {
773        self.params.join_table = name.to_string();
774        self
775    }
776
777    fn autoinc(&mut self) -> &mut Self {
778        self.params.autoinc = true;
779        self
780    }
781
782    fn timestamps(&mut self) -> &mut Self {
783        self.params.timestamps = true;
784        self
785    }
786
787    fn fetch_sql(&mut self) -> &mut Self {
788        self.params.sql = true;
789        self
790    }
791
792    fn order(&mut self, field: &str, by: bool) -> &mut Self {
793        self.params.order[field] = {
794            if by {
795                "DESC"
796            } else {
797                "ASC"
798            }
799        }
800        .into();
801        self
802    }
803
804    fn group(&mut self, field: &str) -> &mut Self {
805        let fields: Vec<&str> = field.split(",").collect();
806        for field in fields.iter() {
807            let field = field.to_string();
808            self.params.group[field.as_str()] = field.clone().into();
809            self.params.fields[field.as_str()] = field.clone().into();
810        }
811        self
812    }
813
814    fn distinct(&mut self) -> &mut Self {
815        self.params.distinct = true;
816        self
817    }
818
819    fn json(&mut self, field: &str) -> &mut Self {
820        let list: Vec<&str> = field.split(",").collect();
821        for item in list.iter() {
822            self.params.json[item.to_string().as_str()] = item.to_string().into();
823        }
824        self
825    }
826
827    fn location(&mut self, field: &str) -> &mut Self {
828        let list: Vec<&str> = field.split(",").collect();
829        for item in list.iter() {
830            self.params.location[item.to_string().as_str()] = item.to_string().into();
831        }
832        self
833    }
834
835    fn field(&mut self, field: &str) -> &mut Self {
836        let list: Vec<&str> = field.split(",").collect();
837        let join_table = if self.params.join_table.is_empty() {
838            self.params.table.clone()
839        } else {
840            self.params.join_table.clone()
841        };
842        for item in list.iter() {
843            let lower = item.to_lowercase();
844            let is_expr = lower.contains("count(")
845                || lower.contains("sum(")
846                || lower.contains("avg(")
847                || lower.contains("max(")
848                || lower.contains("min(")
849                || lower.contains("case ");
850            if is_expr {
851                self.params.fields[item.to_string().as_str()] = (*item).into();
852            } else if item.contains(" as ") {
853                let text = item.split(" as ").collect::<Vec<&str>>();
854                self.params.fields[item.to_string().as_str()] =
855                    format!("{}.{} as {}", join_table, text[0], text[1]).into();
856            } else {
857                self.params.fields[item.to_string().as_str()] =
858                    format!("{join_table}.{item}").into();
859            }
860        }
861        self
862    }
863
864    fn field_raw(&mut self, expr: &str) -> &mut Self {
865        self.params.fields[expr] = expr.into();
866        self
867    }
868
869    fn hidden(&mut self, name: &str) -> &mut Self {
870        let hidden: Vec<&str> = name.split(",").collect();
871
872        let fields_list = self.table_info(self.params.clone().table.as_str());
873        let mut data = array![];
874        for item in fields_list.members() {
875            let _ = data.push(object! {
876                "name":item["field"].as_str().unwrap_or("")
877            });
878        }
879
880        for item in data.members() {
881            let name = item["name"].as_str().unwrap_or("");
882            if !hidden.contains(&name) {
883                self.params.fields[name] = name.into();
884            }
885        }
886        self
887    }
888
889    fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
890        for f in field.split('|') {
891            if !super::sql_safety::validate_field_name(f) {
892                error!("Invalid field name: {}", f);
893            }
894        }
895        if !super::sql_safety::validate_compare_orator(compare) {
896            error!("Invalid compare operator: {}", compare);
897        }
898        let join_table = if self.params.join_table.is_empty() {
899            self.params.table.clone()
900        } else {
901            self.params.join_table.clone()
902        };
903        if value.is_boolean() {
904            let bool_val = value.as_bool().unwrap_or(false);
905            self.params
906                .where_and
907                .push(format!("{join_table}.{field} {compare} {bool_val}"));
908            return self;
909        }
910        match compare {
911            "between" => {
912                self.params.where_and.push(format!(
913                    "{}.{} between '{}' AND '{}'",
914                    join_table, field, value[0], value[1]
915                ));
916            }
917            "set" => {
918                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
919                let mut wheredata = vec![];
920                for item in list.iter() {
921                    wheredata.push(format!(
922                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
923                    ));
924                }
925                self.params
926                    .where_and
927                    .push(format!("({})", wheredata.join(" or ")));
928            }
929            "notin" => {
930                let mut text = String::new();
931                for item in value.members() {
932                    text = format!("{text},'{item}'");
933                }
934                text = text.trim_start_matches(",").into();
935                self.params
936                    .where_and
937                    .push(format!("{join_table}.{field} not in ({text})"));
938            }
939            "is" => {
940                self.params
941                    .where_and
942                    .push(format!("{join_table}.{field} is {value}"));
943            }
944            "isnot" => {
945                self.params
946                    .where_and
947                    .push(format!("{join_table}.{field} is not {value}"));
948            }
949            "notlike" => {
950                self.params
951                    .where_and
952                    .push(format!("{join_table}.{field} not like '{value}'"));
953            }
954            "in" => {
955                if value.is_array() && value.is_empty() {
956                    self.params.where_and.push("1=0".to_string());
957                    return self;
958                }
959                let mut text = String::new();
960                if value.is_array() {
961                    for item in value.members() {
962                        text = format!("{text},'{item}'");
963                    }
964                } else if value.is_null() {
965                    text = format!("{text},null");
966                } else {
967                    let value = value.as_str().unwrap_or("");
968
969                    let value: Vec<&str> = value.split(",").collect();
970                    for item in value.iter() {
971                        text = format!("{text},'{item}'");
972                    }
973                }
974                text = text.trim_start_matches(",").into();
975
976                self.params
977                    .where_and
978                    .push(format!("{join_table}.{field} {compare} ({text})"));
979            }
980            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
981            // 用法:.where_and("tags", "json_contains", "紧急".into())
982            //       .where_and("tags", "json_contains", json::array!["紧急", "重要"])
983            "json_contains" => {
984                if value.is_array() {
985                    if value.is_empty() {
986                        self.params.where_and.push("1=0".to_string());
987                    } else {
988                        let mut parts = vec![];
989                        for item in value.members() {
990                            let escaped = super::sql_safety::escape_string(&item.to_string());
991                            parts.push(format!(
992                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
993                                escaped
994                            ));
995                        }
996                        self.params
997                            .where_and
998                            .push(format!("({})", parts.join(" OR ")));
999                    }
1000                } else {
1001                    let escaped = super::sql_safety::escape_string(&value.to_string());
1002                    self.params.where_and.push(format!(
1003                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1004                        escaped
1005                    ));
1006                }
1007            }
1008            _ => {
1009                self.params
1010                    .where_and
1011                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1012            }
1013        }
1014        self
1015    }
1016
1017    fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1018        for f in field.split('|') {
1019            if !super::sql_safety::validate_field_name(f) {
1020                error!("Invalid field name: {}", f);
1021            }
1022        }
1023        if !super::sql_safety::validate_compare_orator(compare) {
1024            error!("Invalid compare operator: {}", compare);
1025        }
1026        let join_table = if self.params.join_table.is_empty() {
1027            self.params.table.clone()
1028        } else {
1029            self.params.join_table.clone()
1030        };
1031
1032        if value.is_boolean() {
1033            let bool_val = value.as_bool().unwrap_or(false);
1034            self.params
1035                .where_or
1036                .push(format!("{join_table}.{field} {compare} {bool_val}"));
1037            return self;
1038        }
1039
1040        match compare {
1041            "between" => {
1042                self.params.where_or.push(format!(
1043                    "{}.{} between '{}' AND '{}'",
1044                    join_table, field, value[0], value[1]
1045                ));
1046            }
1047            "set" => {
1048                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1049                let mut wheredata = vec![];
1050                for item in list.iter() {
1051                    wheredata.push(format!(
1052                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1053                    ));
1054                }
1055                self.params
1056                    .where_or
1057                    .push(format!("({})", wheredata.join(" or ")));
1058            }
1059            "notin" => {
1060                let mut text = String::new();
1061                for item in value.members() {
1062                    text = format!("{text},'{item}'");
1063                }
1064                text = text.trim_start_matches(",").into();
1065                self.params
1066                    .where_or
1067                    .push(format!("{join_table}.{field} not in ({text})"));
1068            }
1069            "is" => {
1070                self.params
1071                    .where_or
1072                    .push(format!("{join_table}.{field} is {value}"));
1073            }
1074            "isnot" => {
1075                self.params
1076                    .where_or
1077                    .push(format!("{join_table}.{field} is not {value}"));
1078            }
1079            "in" => {
1080                if value.is_array() && value.is_empty() {
1081                    self.params.where_or.push("1=0".to_string());
1082                    return self;
1083                }
1084                let mut text = String::new();
1085                if value.is_array() {
1086                    for item in value.members() {
1087                        text = format!("{text},'{item}'");
1088                    }
1089                } else {
1090                    let value = value.as_str().unwrap_or("");
1091                    let value: Vec<&str> = value.split(",").collect();
1092                    for item in value.iter() {
1093                        text = format!("{text},'{item}'");
1094                    }
1095                }
1096                text = text.trim_start_matches(",").into();
1097                self.params
1098                    .where_or
1099                    .push(format!("{join_table}.{field} {compare} ({text})"));
1100            }
1101            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
1102            // 用法:.where_or("tags", "json_contains", "紧急".into())
1103            //       .where_or("tags", "json_contains", json::array!["紧急", "重要"])
1104            "json_contains" => {
1105                if value.is_array() {
1106                    if value.is_empty() {
1107                        self.params.where_or.push("1=0".to_string());
1108                    } else {
1109                        let mut parts = vec![];
1110                        for item in value.members() {
1111                            let escaped = super::sql_safety::escape_string(&item.to_string());
1112                            parts.push(format!(
1113                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1114                                escaped
1115                            ));
1116                        }
1117                        self.params
1118                            .where_or
1119                            .push(format!("({})", parts.join(" OR ")));
1120                    }
1121                } else {
1122                    let escaped = super::sql_safety::escape_string(&value.to_string());
1123                    self.params.where_or.push(format!(
1124                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1125                        escaped
1126                    ));
1127                }
1128            }
1129            _ => {
1130                self.params
1131                    .where_or
1132                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1133            }
1134        }
1135        self
1136    }
1137
1138    fn where_raw(&mut self, expr: &str) -> &mut Self {
1139        self.params.where_and.push(expr.to_string());
1140        self
1141    }
1142
1143    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1144        self.params
1145            .where_and
1146            .push(format!("\"{field}\" IN ({sub_sql})"));
1147        self
1148    }
1149
1150    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1151        self.params
1152            .where_and
1153            .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1154        self
1155    }
1156
1157    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1158        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1159        self
1160    }
1161
1162    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1163        self.params
1164            .where_and
1165            .push(format!("NOT EXISTS ({sub_sql})"));
1166        self
1167    }
1168
1169    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1170        self.params.where_column = format!(
1171            "{}.{} {} {}.{}",
1172            self.params.table, field_a, compare, self.params.table, field_b
1173        );
1174        self
1175    }
1176
1177    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1178        self.params
1179            .update_column
1180            .push(format!("{field_a} = {compare}"));
1181        self
1182    }
1183
1184    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1185        self.params.page = page;
1186        self.params.limit = limit;
1187        self
1188    }
1189
1190    fn limit(&mut self, count: i32) -> &mut Self {
1191        self.params.limit_only = count;
1192        self
1193    }
1194
1195    fn column(&mut self, field: &str) -> JsonValue {
1196        self.field(field);
1197        let sql = self.params.select_sql();
1198
1199        if self.params.sql {
1200            return JsonValue::from(sql);
1201        }
1202        let (state, data) = self.query(sql.as_str());
1203        match state {
1204            true => {
1205                let mut list = array![];
1206                for item in data.members() {
1207                    if self.params.json[field].is_empty() {
1208                        let _ = list.push(item[field].clone());
1209                    } else {
1210                        let data =
1211                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1212                        let _ = list.push(data);
1213                    }
1214                }
1215                list
1216            }
1217            false => {
1218                array![]
1219            }
1220        }
1221    }
1222
1223    fn count(&mut self) -> JsonValue {
1224        self.params.fields = json::object! {};
1225        self.params.fields["count"] = "count(*) as count".to_string().into();
1226        let sql = self.params.select_sql();
1227        if self.params.sql {
1228            return JsonValue::from(sql.clone());
1229        }
1230        let (state, data) = self.query(sql.as_str());
1231        if state {
1232            data[0]["count"].clone()
1233        } else {
1234            JsonValue::from(0)
1235        }
1236    }
1237
1238    fn max(&mut self, field: &str) -> JsonValue {
1239        self.params.fields[field] = format!("max({field}) as {field}").into();
1240        let sql = self.params.select_sql();
1241        if self.params.sql {
1242            return JsonValue::from(sql.clone());
1243        }
1244        let (state, data) = self.query(sql.as_str());
1245        if state {
1246            if data.len() > 1 {
1247                return data.clone();
1248            }
1249            data[0][field].clone()
1250        } else {
1251            JsonValue::from(0)
1252        }
1253    }
1254
1255    fn min(&mut self, field: &str) -> JsonValue {
1256        self.params.fields[field] = format!("min({field}) as {field}").into();
1257        let sql = self.params.select_sql();
1258        if self.params.sql {
1259            return JsonValue::from(sql.clone());
1260        }
1261        let (state, data) = self.query(sql.as_str());
1262        if state {
1263            if data.len() > 1 {
1264                return data;
1265            }
1266            data[0][field].clone()
1267        } else {
1268            JsonValue::from(0)
1269        }
1270    }
1271
1272    fn sum(&mut self, field: &str) -> JsonValue {
1273        self.params.fields[field] = format!("sum({field}) as {field}").into();
1274        let sql = self.params.select_sql();
1275        if self.params.sql {
1276            return JsonValue::from(sql.clone());
1277        }
1278        let (state, data) = self.query(sql.as_str());
1279        match state {
1280            true => {
1281                if data.len() > 1 {
1282                    return data;
1283                }
1284                data[0][field].clone()
1285            }
1286            false => JsonValue::from(0),
1287        }
1288    }
1289
1290    fn avg(&mut self, field: &str) -> JsonValue {
1291        self.params.fields[field] = format!("avg({field}) as {field}").into();
1292        let sql = self.params.select_sql();
1293        if self.params.sql {
1294            return JsonValue::from(sql.clone());
1295        }
1296        let (state, data) = self.query(sql.as_str());
1297        if state {
1298            if data.len() > 1 {
1299                return data;
1300            }
1301            data[0][field].clone()
1302        } else {
1303            JsonValue::from(0)
1304        }
1305    }
1306
1307    fn having(&mut self, expr: &str) -> &mut Self {
1308        self.params.having.push(expr.to_string());
1309        self
1310    }
1311
1312    fn select(&mut self) -> JsonValue {
1313        let sql = self.params.select_sql();
1314        if self.params.sql {
1315            return JsonValue::from(sql.clone());
1316        }
1317        let (state, mut data) = self.query(sql.as_str());
1318        match state {
1319            true => {
1320                for (field, _) in self.params.json.entries() {
1321                    for item in data.members_mut() {
1322                        if !item[field].is_empty() {
1323                            let json = item[field].to_string();
1324                            item[field] = match json::parse(&json) {
1325                                Ok(e) => e,
1326                                Err(_) => JsonValue::from(json),
1327                            };
1328                        }
1329                    }
1330                }
1331                data.clone()
1332            }
1333            false => array![],
1334        }
1335    }
1336
1337    fn find(&mut self) -> JsonValue {
1338        self.params.page = 1;
1339        self.params.limit = 1;
1340        let sql = self.params.select_sql();
1341        if self.params.sql {
1342            return JsonValue::from(sql.clone());
1343        }
1344        let (state, mut data) = self.query(sql.as_str());
1345        match state {
1346            true => {
1347                if data.is_empty() {
1348                    return object! {};
1349                }
1350                for (field, _) in self.params.json.entries() {
1351                    if !data[0][field].is_empty() {
1352                        let json = data[0][field].to_string();
1353                        let json = json::parse(&json).unwrap_or(array![]);
1354                        data[0][field] = json;
1355                    } else {
1356                        data[0][field] = array![];
1357                    }
1358                }
1359                data[0].clone()
1360            }
1361            false => {
1362                object! {}
1363            }
1364        }
1365    }
1366
1367    fn value(&mut self, field: &str) -> JsonValue {
1368        self.params.fields = object! {};
1369        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1370        self.params.page = 1;
1371        self.params.limit = 1;
1372        let sql = self.params.select_sql();
1373        if self.params.sql {
1374            return JsonValue::from(sql.clone());
1375        }
1376        let (state, mut data) = self.query(sql.as_str());
1377        match state {
1378            true => {
1379                for (field, _) in self.params.json.entries() {
1380                    if !data[0][field].is_empty() {
1381                        let json = data[0][field].to_string();
1382                        let json = json::parse(&json).unwrap_or(array![]);
1383                        data[0][field] = json;
1384                    } else {
1385                        data[0][field] = array![];
1386                    }
1387                }
1388                data[0][field].clone()
1389            }
1390            false => {
1391                if self.connection.debug {
1392                    info!("{data:?}");
1393                }
1394                JsonValue::Null
1395            }
1396        }
1397    }
1398
1399    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1400        let fields_list = self.table_info(&self.params.table.clone());
1401        let mut fields = vec![];
1402        let mut values = vec![];
1403        if !self.params.autoinc && data["id"].is_empty() {
1404            let thread_id = format!("{:?}", std::thread::current().id());
1405            let thread_num: u64 = thread_id
1406                .trim_start_matches("ThreadId(")
1407                .trim_end_matches(")")
1408                .parse()
1409                .unwrap_or(0);
1410            data["id"] = format!(
1411                "{:X}{:X}",
1412                Local::now().timestamp_nanos_opt().unwrap_or(0),
1413                thread_num
1414            )
1415            .into();
1416        }
1417        for (field, value) in data.entries() {
1418            fields.push(format!("\"{}\"", field));
1419
1420            if value.is_string() {
1421                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1422                continue;
1423            } else if value.is_array() {
1424                if self.params.json[field].is_empty() {
1425                    let array = value
1426                        .members()
1427                        .map(|x| x.as_str().unwrap_or(""))
1428                        .collect::<Vec<&str>>()
1429                        .join(",");
1430                    values.push(format!("'{array}'"));
1431                } else {
1432                    let json = value.to_string();
1433                    let json = json.replace("'", "''");
1434                    values.push(format!("'{json}'"));
1435                }
1436                continue;
1437            } else if value.is_object() {
1438                if self.params.json[field].is_empty() {
1439                    values.push(format!("'{value}'"));
1440                } else {
1441                    let json = value.to_string();
1442                    let json = json.replace("'", "''");
1443                    values.push(format!("'{json}'"));
1444                }
1445                continue;
1446            } else if value.is_number() {
1447                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1448                if col_type == "boolean" {
1449                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1450                    values.push(format!("{bool_val}"));
1451                } else if col_type.contains("int") {
1452                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1453                } else {
1454                    values.push(format!("{value}"));
1455                }
1456                continue;
1457            } else if value.is_boolean() || value.is_null() {
1458                values.push(format!("{value}"));
1459                continue;
1460            } else {
1461                values.push(format!("'{value}'"));
1462                continue;
1463            }
1464        }
1465        let fields = fields.join(",");
1466        let values = values.join(",");
1467
1468        let sql = format!(
1469            "INSERT INTO {} ({}) VALUES ({});",
1470            self.params.table, fields, values
1471        );
1472        if self.params.sql {
1473            return JsonValue::from(sql.clone());
1474        }
1475        let (state, ids) = self.execute(sql.as_str());
1476
1477        match state {
1478            true => match self.params.autoinc {
1479                true => ids.clone(),
1480                false => data["id"].clone(),
1481            },
1482            false => {
1483                let thread_id = format!("{:?}", thread::current().id());
1484                error!("添加失败: {thread_id} {ids:?} {sql}");
1485                JsonValue::from("")
1486            }
1487        }
1488    }
1489    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1490        let fields_list = self.table_info(&self.params.table.clone());
1491        let mut fields = String::new();
1492        if !self.params.autoinc && data[0]["id"].is_empty() {
1493            data[0]["id"] = "".into();
1494        }
1495        for (field, _) in data[0].entries() {
1496            fields = format!("{fields},\"{field}\"");
1497        }
1498        fields = fields.trim_start_matches(",").to_string();
1499
1500        let core_count = num_cpus::get();
1501        let mut p = pools::Pool::new(core_count * 4);
1502
1503        let autoinc = self.params.autoinc;
1504        for list in data.members() {
1505            let mut item = list.clone();
1506            let i = br_fields::str::Code::verification_code(3);
1507            let fields_list_new = fields_list.clone();
1508            p.execute(move |pcindex| {
1509                if !autoinc && item["id"].is_empty() {
1510                    let id = format!(
1511                        "{:X}{:X}{}",
1512                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1513                        pcindex,
1514                        i
1515                    );
1516                    item["id"] = id.into();
1517                }
1518                let mut values = "".to_string();
1519                for (field, value) in item.entries() {
1520                    if value.is_string() {
1521                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1522                    } else if value.is_number() {
1523                        let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1524                        if col_type == "boolean" {
1525                            let bool_val = value.as_i64().unwrap_or(0) != 0;
1526                            values = format!("{values},{bool_val}");
1527                        } else if col_type.contains("int") {
1528                            values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1529                        } else {
1530                            values = format!("{values},{value}");
1531                        }
1532                    } else if value.is_boolean() {
1533                        values = format!("{values},{value}");
1534                        continue;
1535                    } else {
1536                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1537                    }
1538                }
1539                values = format!("({})", values.trim_start_matches(","));
1540                array![item["id"].clone(), values]
1541            });
1542        }
1543        let (ids_list, mut values) = p.insert_all();
1544        values = values.trim_start_matches(",").to_string();
1545        let sql = format!(
1546            "INSERT INTO {} ({}) VALUES {};",
1547            self.params.table, fields, values
1548        );
1549
1550        if self.params.sql {
1551            return JsonValue::from(sql.clone());
1552        }
1553        let (state, data) = self.execute(sql.as_str());
1554        match state {
1555            true => match autoinc {
1556                true => data,
1557                false => JsonValue::from(ids_list),
1558            },
1559            false => {
1560                error!("insert_all: {data:?}");
1561                let _ = std::fs::write("pgsql_insert_all_error.log", format!("insert_all error:\n{:?}\n\nSQL:\n{}", data, sql));
1562                array![]
1563            }
1564        }
1565    }
1566    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1567        let fields_list = self.table_info(&self.params.table.clone());
1568        let mut fields = vec![];
1569        let mut values = vec![];
1570        if !self.params.autoinc && data["id"].is_empty() {
1571            let thread_id = format!("{:?}", std::thread::current().id());
1572            let thread_num: u64 = thread_id
1573                .trim_start_matches("ThreadId(")
1574                .trim_end_matches(")")
1575                .parse()
1576                .unwrap_or(0);
1577            data["id"] = format!(
1578                "{:X}{:X}",
1579                Local::now().timestamp_nanos_opt().unwrap_or(0),
1580                thread_num
1581            )
1582            .into();
1583        }
1584        for (field, value) in data.entries() {
1585            fields.push(format!("\"{}\"", field));
1586
1587            if value.is_string() {
1588                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1589                continue;
1590            } else if value.is_array() {
1591                if self.params.json[field].is_empty() {
1592                    let array = value
1593                        .members()
1594                        .map(|x| x.as_str().unwrap_or(""))
1595                        .collect::<Vec<&str>>()
1596                        .join(",");
1597                    values.push(format!("'{array}'"));
1598                } else {
1599                    let json = value.to_string();
1600                    let json = json.replace("'", "''");
1601                    values.push(format!("'{json}'"));
1602                }
1603                continue;
1604            } else if value.is_object() {
1605                if self.params.json[field].is_empty() {
1606                    values.push(format!("'{value}'"));
1607                } else {
1608                    let json = value.to_string();
1609                    let json = json.replace("'", "''");
1610                    values.push(format!("'{json}'"));
1611                }
1612                continue;
1613            } else if value.is_number() {
1614                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1615                if col_type == "boolean" {
1616                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1617                    values.push(format!("{bool_val}"));
1618                } else if col_type.contains("int") {
1619                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1620                } else {
1621                    values.push(format!("{value}"));
1622                }
1623                continue;
1624            } else if value.is_boolean() || value.is_null() {
1625                values.push(format!("{value}"));
1626                continue;
1627            } else {
1628                values.push(format!("'{value}'"));
1629                continue;
1630            }
1631        }
1632
1633        let conflict_cols: Vec<String> = conflict_fields
1634            .iter()
1635            .map(|f| format!("\"{}\"", f))
1636            .collect();
1637
1638        let update_set: Vec<String> = fields
1639            .iter()
1640            .filter(|f| {
1641                let name = f.trim_matches('"');
1642                !conflict_fields.contains(&name) && name != "id"
1643            })
1644            .map(|f| format!("{f}=EXCLUDED.{f}"))
1645            .collect();
1646
1647        let fields_str = fields.join(",");
1648        let values_str = values.join(",");
1649
1650        let sql = format!(
1651            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1652            self.params.table,
1653            fields_str,
1654            values_str,
1655            conflict_cols.join(","),
1656            update_set.join(",")
1657        );
1658        if self.params.sql {
1659            return JsonValue::from(sql.clone());
1660        }
1661        let (state, result) = self.execute(sql.as_str());
1662        match state {
1663            true => match self.params.autoinc {
1664                true => result.clone(),
1665                false => data["id"].clone(),
1666            },
1667            false => {
1668                let thread_id = format!("{:?}", thread::current().id());
1669                error!("upsert失败: {thread_id} {result:?} {sql}");
1670                JsonValue::from("")
1671            }
1672        }
1673    }
1674    fn update(&mut self, data: JsonValue) -> JsonValue {
1675        let fields_list = self.table_info(&self.params.table.clone());
1676        let mut values = vec![];
1677        for (field, value) in data.entries() {
1678            if value.is_string() {
1679                values.push(format!(
1680                    "\"{}\"='{}'",
1681                    field,
1682                    value.to_string().replace("'", "''")
1683                ));
1684            } else if value.is_number() {
1685                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1686                if col_type == "boolean" {
1687                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1688                    values.push(format!("\"{field}\"= {bool_val}"));
1689                } else if col_type.contains("int") {
1690                    values.push(format!(
1691                        "\"{}\"= {}",
1692                        field,
1693                        value.as_f64().unwrap_or(0.0) as i64
1694                    ));
1695                } else {
1696                    values.push(format!("\"{field}\"= {value}"));
1697                }
1698            } else if value.is_array() {
1699                if self.params.json[field].is_empty() {
1700                    let array = value
1701                        .members()
1702                        .map(|x| x.as_str().unwrap_or(""))
1703                        .collect::<Vec<&str>>()
1704                        .join(",");
1705                    values.push(format!("\"{field}\"='{array}'"));
1706                } else {
1707                    let json = value.to_string();
1708                    let json = json.replace("'", "''");
1709                    values.push(format!("\"{field}\"='{json}'"));
1710                }
1711                continue;
1712            } else if value.is_object() {
1713                if self.params.json[field].is_empty() {
1714                    values.push(format!("\"{field}\"='{value}'"));
1715                } else {
1716                    if value.is_empty() {
1717                        values.push(format!("\"{field}\"=''"));
1718                        continue;
1719                    }
1720                    let json = value.to_string();
1721                    let json = json.replace("'", "''");
1722                    values.push(format!("\"{field}\"='{json}'"));
1723                }
1724                continue;
1725            } else if value.is_boolean() {
1726                values.push(format!("\"{field}\"= {value}"));
1727            } else {
1728                values.push(format!("\"{field}\"=\"{value}\""));
1729            }
1730        }
1731
1732        for (field, value) in self.params.inc_dec.entries() {
1733            values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1734        }
1735        if !self.params.update_column.is_empty() {
1736            values.extend(self.params.update_column.clone());
1737        }
1738        let values = values.join(",");
1739
1740        let sql = format!(
1741            "UPDATE {} SET {} {};",
1742            self.params.table.clone(),
1743            values,
1744            self.params.where_sql()
1745        );
1746        if self.params.sql {
1747            return JsonValue::from(sql.clone());
1748        }
1749        let (state, data) = self.execute(sql.as_str());
1750        if state {
1751            data
1752        } else {
1753            let thread_id = format!("{:?}", thread::current().id());
1754            error!("update: {thread_id} {data:?} {sql}");
1755            0.into()
1756        }
1757    }
1758    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1759        let fields_list = self.table_info(&self.params.table.clone());
1760        let mut values = vec![];
1761
1762        let mut ids = vec![];
1763        for (field, _) in data[0].entries() {
1764            if field == "id" {
1765                continue;
1766            }
1767            let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1768            let mut fields = vec![];
1769            for row in data.members() {
1770                let value = row[field].clone();
1771                let id = row["id"].clone();
1772                ids.push(id.clone());
1773                if value.is_string() {
1774                    fields.push(format!(
1775                        "WHEN '{}' THEN '{}'",
1776                        id,
1777                        value.to_string().replace("'", "''")
1778                    ));
1779                } else if value.is_array() || value.is_object() {
1780                    if self.params.json[field].is_empty() {
1781                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1782                    } else {
1783                        let json = value.to_string();
1784                        let json = json.replace("'", "''");
1785                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1786                    }
1787                    continue;
1788                } else if value.is_number() {
1789                    if col_type == "boolean" {
1790                        let bool_val = value.as_i64().unwrap_or(0) != 0;
1791                        fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1792                    } else {
1793                        fields.push(format!("WHEN '{id}' THEN {value}"));
1794                    }
1795                } else if value.is_boolean() || value.is_null() {
1796                    fields.push(format!("WHEN '{id}' THEN {value}"));
1797                } else {
1798                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1799                }
1800            }
1801            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1802        }
1803        self.where_and("id", "in", ids.into());
1804        for (field, value) in self.params.inc_dec.entries() {
1805            values.push(format!("{} = {}", field, value.to_string().clone()));
1806        }
1807
1808        let values = values.join(",");
1809        let sql = format!(
1810            "UPDATE {} SET {} {} {};",
1811            self.params.table.clone(),
1812            values,
1813            self.params.where_sql(),
1814            self.params.page_limit_sql()
1815        );
1816        if self.params.sql {
1817            return JsonValue::from(sql.clone());
1818        }
1819        let (state, data) = self.execute(sql.as_str());
1820        if state {
1821            data
1822        } else {
1823            error!("update_all: {data:?}");
1824            JsonValue::from(0)
1825        }
1826    }
1827
1828    fn delete(&mut self) -> JsonValue {
1829        let sql = format!(
1830            "delete FROM {} {} {};",
1831            self.params.table.clone(),
1832            self.params.where_sql(),
1833            self.params.page_limit_sql()
1834        );
1835        if self.params.sql {
1836            return JsonValue::from(sql.clone());
1837        }
1838        let (state, data) = self.execute(sql.as_str());
1839        match state {
1840            true => data,
1841            false => {
1842                error!("delete 失败>>> {data:?}");
1843                JsonValue::from(0)
1844            }
1845        }
1846    }
1847
1848    fn transaction(&mut self) -> bool {
1849        let thread_id = format!("{:?}", thread::current().id());
1850        let key = format!("{}{}", self.default, thread_id);
1851
1852        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1853            let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1854            PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1855            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1856            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1857            return true;
1858        }
1859
1860        // 获取事务连接,失败时重试2次(短退避)
1861        let mut conn = None;
1862        for attempt in 0..3u8 {
1863            match self.client.get_connect_for_transaction() {
1864                Ok(mut c) => {
1865                    if c.is_valid() {
1866                        conn = Some(c);
1867                        break;
1868                    }
1869                    warn!("事务连接无效(第{}次)", attempt + 1);
1870                    self.client.release_transaction_conn();
1871                }
1872                Err(e) => {
1873                    warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1874                }
1875            }
1876            if attempt < 2 {
1877                thread::sleep(std::time::Duration::from_millis(200));
1878            }
1879        }
1880        let mut conn = match conn {
1881            Some(c) => c,
1882            None => {
1883                error!("获取事务连接重试耗尽");
1884                return false;
1885            }
1886        };
1887
1888        if let Err(e) = conn.execute("START TRANSACTION") {
1889            error!("启动事务失败: {e}");
1890            self.client.release_transaction_conn();
1891            return false;
1892        }
1893
1894        if let Err(e) = conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") {
1895            error!("设置事务隔离级别失败: {e}");
1896            let _ = conn.execute("ROLLBACK");
1897            self.client.release_transaction_conn();
1898            return false;
1899        }
1900
1901        PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1902        true
1903    }
1904    fn commit(&mut self) -> bool {
1905        let thread_id = format!("{:?}", thread::current().id());
1906        let key = format!("{}{}", self.default, thread_id);
1907
1908        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1909            error!("commit: 没有活跃的事务");
1910            return false;
1911        }
1912
1913        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1914        if depth > 1 {
1915            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1916            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1917            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1918            return true;
1919        }
1920
1921        let commit_result =
1922            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1923
1924        let success = match commit_result {
1925            Some(Ok(_)) => true,
1926            Some(Err(e)) => {
1927                error!("提交事务失败: {e}");
1928                false
1929            }
1930            None => {
1931                error!("提交事务失败: 未找到连接");
1932                false
1933            }
1934        };
1935
1936        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1937        self.client.release_transaction_conn();
1938        success
1939    }
1940
1941    fn rollback(&mut self) -> bool {
1942        let thread_id = format!("{:?}", thread::current().id());
1943        let key = format!("{}{}", self.default, thread_id);
1944
1945        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1946            error!("rollback: 没有活跃的事务");
1947            return false;
1948        }
1949
1950        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1951        if depth > 1 {
1952            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1953            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1954            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1955            return true;
1956        }
1957
1958        let rollback_result =
1959            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1960
1961        let success = match rollback_result {
1962            Some(Ok(_)) => true,
1963            Some(Err(e)) => {
1964                error!("回滚失败: {e}");
1965                false
1966            }
1967            None => {
1968                error!("回滚失败: 未找到连接");
1969                false
1970            }
1971        };
1972
1973        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1974        self.client.release_transaction_conn();
1975        success
1976    }
1977
1978    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1979        let (state, data) = self.query(sql);
1980        match state {
1981            true => Ok(data),
1982            false => Err(data.to_string()),
1983        }
1984    }
1985
1986    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1987        let (state, data) = self.execute(sql);
1988        match state {
1989            true => Ok(data),
1990            false => Err(data.to_string()),
1991        }
1992    }
1993
1994    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1995        self.params.inc_dec[field] = format!("{field} + {num}").into();
1996        self
1997    }
1998
1999    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2000        self.params.inc_dec[field] = format!("{field} - {num}").into();
2001        self
2002    }
2003    fn buildsql(&mut self) -> String {
2004        self.fetch_sql();
2005        let sql = self.select().to_string();
2006        format!("( {} ) {}", sql, self.params.table)
2007    }
2008
2009    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2010        for field in fields {
2011            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2012        }
2013        self
2014    }
2015
2016    fn join(
2017        &mut self,
2018        main_table: &str,
2019        main_fields: &str,
2020        right_table: &str,
2021        right_fields: &str,
2022    ) -> &mut Self {
2023        let main_table = if main_table.is_empty() {
2024            self.params.table.clone()
2025        } else {
2026            main_table.to_string()
2027        };
2028        self.params.join_table = right_table.to_string();
2029        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2030        self
2031    }
2032
2033    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2034        let main_fields = if main_fields.is_empty() {
2035            "id"
2036        } else {
2037            main_fields
2038        };
2039        let second_fields = if second_fields.is_empty() {
2040            self.params.table.clone()
2041        } else {
2042            second_fields.to_string().clone()
2043        };
2044        let sec_table_name = format!("{}{}", table, "_2");
2045        let second_table = format!("{} {}", table, sec_table_name.clone());
2046        self.params.join_table = sec_table_name.clone();
2047        self.params.join.push(format!(
2048            " INNER JOIN {} ON {}.{} = {}.{}",
2049            second_table, self.params.table, main_fields, sec_table_name, second_fields
2050        ));
2051        self
2052    }
2053
2054    fn join_right(
2055        &mut self,
2056        main_table: &str,
2057        main_fields: &str,
2058        right_table: &str,
2059        right_fields: &str,
2060    ) -> &mut Self {
2061        let main_table = if main_table.is_empty() {
2062            self.params.table.clone()
2063        } else {
2064            main_table.to_string()
2065        };
2066        self.params.join_table = right_table.to_string();
2067        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2068        self
2069    }
2070
2071    fn join_full(
2072        &mut self,
2073        main_table: &str,
2074        main_fields: &str,
2075        right_table: &str,
2076        right_fields: &str,
2077    ) -> &mut Self {
2078        let main_table = if main_table.is_empty() {
2079            self.params.table.clone()
2080        } else {
2081            main_table.to_string()
2082        };
2083        self.params.join_table = right_table.to_string();
2084        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2085        self
2086    }
2087
2088    fn union(&mut self, sub_sql: &str) -> &mut Self {
2089        self.params.unions.push(format!("UNION {sub_sql}"));
2090        self
2091    }
2092
2093    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2094        self.params.unions.push(format!("UNION ALL {sub_sql}"));
2095        self
2096    }
2097
2098    fn lock_for_update(&mut self) -> &mut Self {
2099        self.params.lock_mode = "FOR UPDATE".to_string();
2100        self
2101    }
2102
2103    fn lock_for_share(&mut self) -> &mut Self {
2104        self.params.lock_mode = "FOR SHARE".to_string();
2105        self
2106    }
2107}