Skip to main content

br_db/types/
pgsql.rs

1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use br_pgsql::PgsqlError;
7use chrono::Local;
8use json::{array, object, JsonValue};
9use log::{error, info, warn};
10use std::thread;
11#[derive(Clone)]
12pub struct Pgsql {
13    /// 当前连接配置
14    pub connection: Connection,
15    /// 当前选中配置
16    pub default: String,
17    pub params: Params,
18    pub client: Pools,
19}
20
21impl Pgsql {
22    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
23        let port = connection
24            .hostport
25            .parse::<i32>()
26            .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
27
28        let cp_connection = connection.clone();
29        let config = object! {
30            debug: cp_connection.debug,
31            username: cp_connection.username,
32            userpass: cp_connection.userpass,
33            database: cp_connection.database,
34            hostname: cp_connection.hostname,
35            hostport: port,
36            charset: cp_connection.charset.str(),
37        };
38        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
39
40        let pools = pgsql.pools()?;
41        Ok(Self {
42            connection,
43            default: default.clone(),
44            params: Params::default("pgsql"),
45            client: pools,
46        })
47    }
48
49    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
50        let thread_id = format!("{:?}", thread::current().id());
51        let key = format!("{}{}", self.default, thread_id);
52
53        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
54            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
55
56            match result {
57                Some(Ok(e)) => {
58                    if self.connection.debug {
59                        info!("查询成功: {} {}", thread_id.clone(), sql);
60                    }
61                    (true, e.rows)
62                }
63                Some(Err(e)) => {
64                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
65                    (false, JsonValue::from(e.to_string()))
66                }
67                None => {
68                    error!("事务查询失败: 未找到事务连接 {thread_id}");
69                    (false, JsonValue::from("未找到事务连接"))
70                }
71            }
72        } else {
73            let mut guard = match self.client.get_guard() {
74                Ok(g) => g,
75                Err(e) => {
76                    error!(
77                        "非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
78                    );
79                    return (false, JsonValue::from(e.to_string()));
80                }
81            };
82            let res = guard.conn().query(sql);
83            match res {
84                Ok(e) => {
85                    if self.connection.debug {
86                        info!("查询成功: {} {}", thread_id.clone(), sql);
87                    }
88                    (true, e.rows)
89                }
90                Err(ref e) if Self::is_retriable_error(e) => {
91                    warn!("非事务查询连接断开,重试: {thread_id} {e}");
92                    drop(guard);
93                    let mut guard2 = match self.client.get_guard() {
94                        Ok(g) => g,
95                        Err(e2) => {
96                            error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e2} SQL语句: [{sql}]");
97                            return (false, JsonValue::from(e2.to_string()));
98                        }
99                    };
100                    match guard2.conn().query(sql) {
101                        Ok(e) => {
102                            if self.connection.debug {
103                                info!("查询重试成功: {} {}", thread_id.clone(), sql);
104                            }
105                            (true, e.rows)
106                        }
107                        Err(e2) => {
108                            error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e2} SQL语句: [{sql}]");
109                            (false, JsonValue::from(e2.to_string()))
110                        }
111                    }
112                }
113                Err(e) => {
114                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
115                    (false, JsonValue::from(e.to_string()))
116                }
117            }
118        }
119    }
120    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
121        let thread_id = format!("{:?}", thread::current().id());
122        let key = format!("{}{}", self.default, thread_id);
123
124        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
125            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
126
127            match result {
128                Some(Ok(e)) => {
129                    if self.connection.debug {
130                        info!("提交成功: {} {}", thread_id.clone(), sql);
131                    }
132                    if sql.contains("INSERT") {
133                        (true, e.rows)
134                    } else {
135                        (true, e.affect_count.into())
136                    }
137                }
138                Some(Err(e)) => {
139                    error!("事务提交失败: {thread_id} {e} {sql}");
140                    (false, JsonValue::from(e.to_string()))
141                }
142                None => {
143                    error!("事务执行失败: 未找到事务连接 {thread_id}");
144                    (false, JsonValue::from("未找到事务连接"))
145                }
146            }
147        } else {
148            let mut guard = match self.client.get_guard() {
149                Ok(g) => g,
150                Err(e) => {
151                    error!(
152                        "非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
153                    );
154                    return (false, JsonValue::from(e.to_string()));
155                }
156            };
157            let res = guard.conn().execute(sql);
158            match res {
159                Ok(e) => {
160                    if self.connection.debug {
161                        info!("提交成功: {} {}", thread_id.clone(), sql);
162                    }
163                    if sql.contains("INSERT") {
164                        (true, e.rows)
165                    } else {
166                        (true, e.affect_count.into())
167                    }
168                }
169                Err(ref e) if Self::is_retriable_error(e) => {
170                    warn!("非事务执行连接断开,重试: {thread_id} {e}");
171                    drop(guard);
172                    let mut guard2 = match self.client.get_guard() {
173                        Ok(g) => g,
174                        Err(e2) => {
175                            error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e2} SQL语句: [{sql}]");
176                            return (false, JsonValue::from(e2.to_string()));
177                        }
178                    };
179                    match guard2.conn().execute(sql) {
180                        Ok(e) => {
181                            if self.connection.debug {
182                                info!("执行重试成功: {} {}", thread_id.clone(), sql);
183                            }
184                            if sql.contains("INSERT") {
185                                (true, e.rows)
186                            } else {
187                                (true, e.affect_count.into())
188                            }
189                        }
190                        Err(e2) => {
191                            error!("非事务执行重试失败: {thread_id} {e2} {sql}");
192                            (false, JsonValue::from(e2.to_string()))
193                        }
194                    }
195                }
196                Err(e) => {
197                    error!("非事务提交失败: {thread_id} {e} {sql}");
198                    (false, JsonValue::from(e.to_string()))
199                }
200            }
201        }
202    }
203
204    /// 判断是否为可重试的连接类错误(连接断开、IO错误)
205    fn is_retriable_error(e: &PgsqlError) -> bool {
206        matches!(e, PgsqlError::Connection(_) | PgsqlError::Io(_))
207    }
208}
209
210impl DbMode for Pgsql {
211    fn database_tables(&mut self) -> JsonValue {
212        let sql = "SHOW TABLES".to_string();
213        match self.sql(sql.as_str()) {
214            Ok(e) => {
215                let mut list = vec![];
216                for item in e.members() {
217                    for (_, value) in item.entries() {
218                        list.push(value.clone());
219                    }
220                }
221                list.into()
222            }
223            Err(_) => {
224                array![]
225            }
226        }
227    }
228
229    fn database_create(&mut self, name: &str) -> bool {
230        let sql = format!("CREATE DATABASE {name}");
231
232        let (state, data) = self.execute(sql.as_str());
233        match state {
234            true => data.as_bool().unwrap_or(true),
235            false => {
236                error!("创建数据库失败: {data:?}");
237                false
238            }
239        }
240    }
241
242    fn truncate(&mut self, table: &str) -> bool {
243        let sql = format!("TRUNCATE TABLE {table}");
244        let (state, _) = self.execute(sql.as_str());
245        state
246    }
247}
248
249impl Mode for Pgsql {
250    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
251        let mut sql = String::new();
252        let mut comments = vec![];
253
254        if !options.table_unique.is_empty() {
255            let full_name = format!(
256                "{}_unique_{}",
257                options.table_name,
258                options.table_unique.join("_")
259            );
260            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
261            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
262            let unique = format!(
263                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
264                name,
265                options.table_name,
266                options.table_unique.join(",")
267            );
268            comments.push(unique);
269        }
270
271        for row in options.table_index.iter() {
272            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
273            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
274            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
275            let index = format!(
276                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
277                name,
278                options.table_name,
279                row.join(",")
280            );
281            comments.push(index);
282        }
283
284        for (name, field) in options.table_fields.entries_mut() {
285            field["table_name"] = options.table_name.clone().into();
286            let row = br_fields::field("pgsql", name, field.clone());
287            let (col_sql, meta) = if let Some(idx) = row.find("--") {
288                (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
289            } else {
290                (row.trim(), None)
291            };
292            if let Some(meta) = meta {
293                comments.push(format!(
294                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
295                    options.table_name, name, meta
296                ));
297            }
298            sql = format!("{} {},\r\n", sql, col_sql);
299        }
300
301        let primary_key = format!(
302            "CONSTRAINT {}_{} PRIMARY KEY ({})",
303            options.table_name, options.table_key, options.table_key
304        );
305        let sql = format!(
306            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
307            options.table_name,
308            sql.trim_end_matches(",\r\n"),
309            primary_key
310        );
311        comments.insert(0, sql);
312
313        for (_name, field) in options.table_fields.entries() {
314            let _ = field["mode"].as_str();
315        }
316
317        if self.params.sql {
318            let info = comments.join("\r\n");
319            return JsonValue::from(info);
320        }
321        for comment in comments {
322            let (state, _) = self.execute(comment.as_str());
323            match state {
324                true => {}
325                false => {
326                    return JsonValue::from(state);
327                }
328            }
329        }
330        JsonValue::from(true)
331    }
332
333    fn table_update(&mut self, options: TableOptions) -> JsonValue {
334        let fields_list = self.table_info(&options.table_name);
335        let mut put = vec![];
336        let mut add = vec![];
337        let mut del = vec![];
338        let mut comments = vec![];
339
340        for (key, _) in fields_list.entries() {
341            if options.table_fields[key].is_empty() {
342                del.push(key);
343            }
344        }
345        for (name, field) in options.table_fields.entries() {
346            if !fields_list[name].is_empty() {
347                let old_info = &fields_list[name];
348                let new_field_sql = br_fields::field("pgsql", name, field.clone());
349
350                let old_comment = old_info["comment"].as_str().unwrap_or("");
351                let old_type = old_info["type"].as_str().unwrap_or("");
352
353                let new_comment = if let Some(idx) = new_field_sql.find("--") {
354                    new_field_sql[idx + 2..].trim()
355                } else {
356                    ""
357                };
358
359                let comment_matches =
360                    if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
361                        let old_parts: Vec<&str> = old_comment.split('|').collect();
362                        let new_parts: Vec<&str> = new_comment.split('|').collect();
363                        if old_parts.len() >= 4 && new_parts.len() >= 4 {
364                            old_parts[..4] == new_parts[..4]
365                        } else {
366                            old_comment == new_comment
367                        }
368                    } else if !old_comment.is_empty() && !new_comment.is_empty() {
369                        let old_parts: Vec<&str> = old_comment.split('|').collect();
370                        let new_parts: Vec<&str> = new_comment.split('|').collect();
371                        if old_parts.len() >= 2
372                            && new_parts.len() >= 2
373                            && old_parts.len() == new_parts.len()
374                        {
375                            let old_filtered: Vec<&str> = old_parts
376                                .iter()
377                                .filter(|v| **v != "true" && **v != "false")
378                                .copied()
379                                .collect();
380                            let new_filtered: Vec<&str> = new_parts
381                                .iter()
382                                .filter(|v| **v != "true" && **v != "false")
383                                .copied()
384                                .collect();
385                            old_filtered == new_filtered
386                        } else {
387                            old_comment == new_comment
388                        }
389                    } else {
390                        old_comment == new_comment
391                    };
392
393                let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
394                let new_type = if sql_parts.len() > 1 {
395                    sql_parts[1].to_lowercase()
396                } else {
397                    String::new()
398                };
399
400                let type_matches = match old_type {
401                    "integer" => {
402                        new_type.contains("int")
403                            && !new_type.contains("bigint")
404                            && !new_type.contains("smallint")
405                    }
406                    "bigint" => new_type.contains("bigint"),
407                    "smallint" => new_type.contains("smallint"),
408                    "boolean" => new_type.contains("boolean"),
409                    "text" => new_type.contains("text"),
410                    "character varying" => new_type.contains("varchar"),
411                    "character" => new_type.contains("char") && !new_type.contains("varchar"),
412                    "numeric" => new_type.contains("numeric") || new_type.contains("decimal"),
413                    "double precision" => {
414                        new_type.contains("double") || new_type.contains("float8")
415                    }
416                    "real" => new_type.contains("real") || new_type.contains("float4"),
417                    "timestamp without time zone" | "timestamp with time zone" => {
418                        new_type.contains("timestamp")
419                    }
420                    "date" => new_type.contains("date") && !new_type.contains("timestamp"),
421                    "time without time zone" | "time with time zone" => {
422                        new_type.contains("time") && !new_type.contains("timestamp")
423                    }
424                    "json" | "jsonb" => new_type.contains("json"),
425                    "uuid" => new_type.contains("uuid"),
426                    "bytea" => new_type.contains("bytea"),
427                    _ => old_type == new_type,
428                };
429
430                if type_matches && comment_matches {
431                    continue;
432                }
433
434                log::debug!(
435                    "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
436                    options.table_name,
437                    name,
438                    type_matches,
439                    old_type,
440                    new_type,
441                    comment_matches
442                );
443                put.push(name);
444            } else {
445                add.push(name);
446            }
447        }
448
449        for name in add.iter() {
450            let name = name.to_string();
451            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
452            let rows = row.split("--").collect::<Vec<&str>>();
453            comments.push(format!(
454                r#"ALTER TABLE "{}" add {};"#,
455                options.table_name,
456                rows[0].trim()
457            ));
458            if rows.len() > 1 {
459                comments.push(format!(
460                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
461                    options.table_name,
462                    name,
463                    rows[1].trim()
464                ));
465            }
466        }
467        for name in del.iter() {
468            comments.push(format!(
469                "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
470                options.table_name, name
471            ));
472        }
473        for name in put.iter() {
474            let name = name.to_string();
475            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
476            let rows = row.split("--").collect::<Vec<&str>>();
477
478            let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
479
480            if sql[1].contains("BOOLEAN") {
481                let text = format!(
482                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
483                    options.table_name, name
484                );
485                comments.push(text.clone());
486                let text = format!(
487                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
488                    options.table_name, name, sql[1]
489                );
490                comments.push(text.clone());
491            } else {
492                let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
493                let new_type_lower = sql[1].to_lowercase();
494                let is_date_to_numeric = (old_col_type == "date"
495                    || old_col_type.contains("timestamp"))
496                    && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
497                if is_date_to_numeric {
498                    comments.push(format!(
499                        "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
500                        options.table_name, name
501                    ));
502                    comments.push(format!(
503                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
504                        options.table_name, name, sql[1], name, name, name
505                    ));
506                } else {
507                    let text = format!(
508                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
509                        options.table_name, name, sql[1]
510                    );
511                    comments.push(text.clone());
512                }
513            };
514
515            if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
516                let default_value = rows[0][default_pos + 9..].trim();
517                if !default_value.is_empty() {
518                    comments.push(format!(
519                        "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
520                        options.table_name, name, default_value
521                    ));
522                }
523            }
524            // PostgreSQL 不使用 NOT NULL 约束,依赖默认值
525            // 如果现有字段有 NOT NULL 约束,移除它
526            let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
527                .as_str()
528                .unwrap_or("YES");
529            let old_is_required = old_is_nullable == "NO";
530
531            // 跳过主键字段,主键隐含 NOT NULL 约束
532            if old_is_required && name != options.table_key {
533                comments.push(format!(
534                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
535                    options.table_name, name
536                ));
537            }
538
539            if rows.len() > 1 {
540                comments.push(format!(
541                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
542                    options.table_name,
543                    name,
544                    rows[1].trim()
545                ));
546            }
547        }
548
549        let mut unique_new = vec![];
550        let mut index_new = vec![];
551        let mut primary_key = vec![];
552        let (_, index_list) = self.query(
553            format!(
554                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
555                options.table_name
556            )
557            .as_str(),
558        );
559        for item in index_list.members() {
560            let key_name = item["indexname"].as_str().unwrap_or("");
561            let indexdef = item["indexdef"].to_string();
562
563            if indexdef.contains(
564                format!(
565                    "CREATE UNIQUE INDEX {}_{} ON",
566                    options.table_name, options.table_key
567                )
568                .as_str(),
569            ) {
570                primary_key.push(key_name.to_string());
571                continue;
572            }
573            if indexdef.contains("CREATE UNIQUE INDEX") {
574                unique_new.push(key_name.to_string());
575                continue;
576            }
577            if indexdef.contains("CREATE INDEX") {
578                index_new.push(key_name.to_string());
579                continue;
580            }
581        }
582
583        if !options.table_unique.is_empty() {
584            let full_name = format!(
585                "{}_unique_{}",
586                options.table_name,
587                options.table_unique.join("_")
588            );
589            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
590            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
591            let unique = format!(
592                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
593                name,
594                options.table_name,
595                options.table_unique.join(",")
596            );
597            if !unique_new.contains(&name) {
598                comments.push(unique);
599            }
600            unique_new.retain(|x| *x != name);
601        }
602
603        for row in options.table_index.iter() {
604            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
605            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
606            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
607            let index = format!(
608                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
609                name,
610                options.table_name,
611                row.join(",")
612            );
613            if !index_new.contains(&name) {
614                comments.push(index);
615            }
616            index_new.retain(|x| *x != name);
617        }
618
619        for item in unique_new {
620            if item.ends_with("_pkey") {
621                continue;
622            }
623            if item.starts_with("unique_") {
624                comments.push(format!(
625                    "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
626                    options.table_name,
627                    item.clone()
628                ));
629            } else {
630                comments.push(format!("DROP INDEX {};\r\n", item.clone()));
631            }
632        }
633        for item in index_new {
634            if item.ends_with("_pkey") {
635                continue;
636            }
637            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
638        }
639
640        if self.params.sql {
641            return JsonValue::from(comments.join(""));
642        }
643
644        if comments.is_empty() {
645            return JsonValue::from(-1);
646        }
647
648        for item in comments.iter() {
649            let (state, res) = self.execute(item.as_str());
650            match state {
651                true => {}
652                false => {
653                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
654                    return JsonValue::from(0);
655                }
656            }
657        }
658        JsonValue::from(1)
659    }
660
661    fn table_info(&mut self, table: &str) -> JsonValue {
662        let sql = format!(
663            "SELECT  COL.COLUMN_NAME,
664    COL.DATA_TYPE,
665    COL.IS_NULLABLE,
666    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
667    LEFT JOIN
668    pg_catalog.pg_description DESCRIPTION
669    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
670    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
671        let (state, data) = self.query(sql.as_str());
672        let mut list = object! {};
673        if state {
674            for item in data.members() {
675                let mut row = object! {};
676                row["field"] = item["column_name"].clone();
677                row["comment"] = item["comment"].clone();
678                row["type"] = item["data_type"].clone();
679                row["is_nullable"] = item["is_nullable"].clone();
680                if let Some(field_name) = row["field"].as_str() {
681                    list[field_name] = row.clone();
682                }
683            }
684            list
685        } else {
686            list
687        }
688    }
689
690    fn table_is_exist(&mut self, name: &str) -> bool {
691        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
692        let (state, data) = self.query(sql.as_str());
693        match state {
694            true => {
695                for item in data.members() {
696                    if item.has_key("exists") {
697                        return item["exists"].as_bool().unwrap_or(false);
698                    }
699                }
700                false
701            }
702            false => false,
703        }
704    }
705
706    fn table(&mut self, name: &str) -> &mut Pgsql {
707        self.params = Params::default(self.connection.mode.str().as_str());
708        let table_name = format!("{}{}", self.connection.prefix, name);
709        if !super::sql_safety::validate_table_name(&table_name) {
710            error!("Invalid table name: {}", name);
711        }
712        self.params.table = table_name.clone();
713        self.params.join_table = table_name;
714        self
715    }
716
717    fn change_table(&mut self, name: &str) -> &mut Self {
718        self.params.join_table = name.to_string();
719        self
720    }
721
722    fn autoinc(&mut self) -> &mut Self {
723        self.params.autoinc = true;
724        self
725    }
726
727    fn timestamps(&mut self) -> &mut Self {
728        self.params.timestamps = true;
729        self
730    }
731
732    fn fetch_sql(&mut self) -> &mut Self {
733        self.params.sql = true;
734        self
735    }
736
737    fn order(&mut self, field: &str, by: bool) -> &mut Self {
738        self.params.order[field] = {
739            if by {
740                "DESC"
741            } else {
742                "ASC"
743            }
744        }
745        .into();
746        self
747    }
748
749    fn group(&mut self, field: &str) -> &mut Self {
750        let fields: Vec<&str> = field.split(",").collect();
751        for field in fields.iter() {
752            let field = field.to_string();
753            self.params.group[field.as_str()] = field.clone().into();
754            self.params.fields[field.as_str()] = field.clone().into();
755        }
756        self
757    }
758
759    fn distinct(&mut self) -> &mut Self {
760        self.params.distinct = true;
761        self
762    }
763
764    fn json(&mut self, field: &str) -> &mut Self {
765        let list: Vec<&str> = field.split(",").collect();
766        for item in list.iter() {
767            self.params.json[item.to_string().as_str()] = item.to_string().into();
768        }
769        self
770    }
771
772    fn location(&mut self, field: &str) -> &mut Self {
773        let list: Vec<&str> = field.split(",").collect();
774        for item in list.iter() {
775            self.params.location[item.to_string().as_str()] = item.to_string().into();
776        }
777        self
778    }
779
780    fn field(&mut self, field: &str) -> &mut Self {
781        let list: Vec<&str> = field.split(",").collect();
782        let join_table = if self.params.join_table.is_empty() {
783            self.params.table.clone()
784        } else {
785            self.params.join_table.clone()
786        };
787        for item in list.iter() {
788            let lower = item.to_lowercase();
789            let is_expr = lower.contains("count(")
790                || lower.contains("sum(")
791                || lower.contains("avg(")
792                || lower.contains("max(")
793                || lower.contains("min(")
794                || lower.contains("case ");
795            if is_expr {
796                self.params.fields[item.to_string().as_str()] = (*item).into();
797            } else if item.contains(" as ") {
798                let text = item.split(" as ").collect::<Vec<&str>>();
799                self.params.fields[item.to_string().as_str()] =
800                    format!("{}.{} as {}", join_table, text[0], text[1]).into();
801            } else {
802                self.params.fields[item.to_string().as_str()] =
803                    format!("{join_table}.{item}").into();
804            }
805        }
806        self
807    }
808
809    fn field_raw(&mut self, expr: &str) -> &mut Self {
810        self.params.fields[expr] = expr.into();
811        self
812    }
813
814    fn hidden(&mut self, name: &str) -> &mut Self {
815        let hidden: Vec<&str> = name.split(",").collect();
816
817        let fields_list = self.table_info(self.params.clone().table.as_str());
818        let mut data = array![];
819        for item in fields_list.members() {
820            let _ = data.push(object! {
821                "name":item["field"].as_str().unwrap_or("")
822            });
823        }
824
825        for item in data.members() {
826            let name = item["name"].as_str().unwrap_or("");
827            if !hidden.contains(&name) {
828                self.params.fields[name] = name.into();
829            }
830        }
831        self
832    }
833
834    fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
835        for f in field.split('|') {
836            if !super::sql_safety::validate_field_name(f) {
837                error!("Invalid field name: {}", f);
838            }
839        }
840        if !super::sql_safety::validate_compare_orator(compare) {
841            error!("Invalid compare operator: {}", compare);
842        }
843        let join_table = if self.params.join_table.is_empty() {
844            self.params.table.clone()
845        } else {
846            self.params.join_table.clone()
847        };
848        if value.is_boolean() {
849            let bool_val = value.as_bool().unwrap_or(false);
850            self.params
851                .where_and
852                .push(format!("{join_table}.{field} {compare} {bool_val}"));
853            return self;
854        }
855        match compare {
856            "between" => {
857                self.params.where_and.push(format!(
858                    "{}.{} between '{}' AND '{}'",
859                    join_table, field, value[0], value[1]
860                ));
861            }
862            "set" => {
863                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
864                let mut wheredata = vec![];
865                for item in list.iter() {
866                    wheredata.push(format!(
867                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
868                    ));
869                }
870                self.params
871                    .where_and
872                    .push(format!("({})", wheredata.join(" or ")));
873            }
874            "notin" => {
875                let mut text = String::new();
876                for item in value.members() {
877                    text = format!("{text},'{item}'");
878                }
879                text = text.trim_start_matches(",").into();
880                self.params
881                    .where_and
882                    .push(format!("{join_table}.{field} not in ({text})"));
883            }
884            "is" => {
885                self.params
886                    .where_and
887                    .push(format!("{join_table}.{field} is {value}"));
888            }
889            "isnot" => {
890                self.params
891                    .where_and
892                    .push(format!("{join_table}.{field} is not {value}"));
893            }
894            "notlike" => {
895                self.params
896                    .where_and
897                    .push(format!("{join_table}.{field} not like '{value}'"));
898            }
899            "in" => {
900                if value.is_array() && value.is_empty() {
901                    self.params.where_and.push("1=0".to_string());
902                    return self;
903                }
904                let mut text = String::new();
905                if value.is_array() {
906                    for item in value.members() {
907                        text = format!("{text},'{item}'");
908                    }
909                } else if value.is_null() {
910                    text = format!("{text},null");
911                } else {
912                    let value = value.as_str().unwrap_or("");
913
914                    let value: Vec<&str> = value.split(",").collect();
915                    for item in value.iter() {
916                        text = format!("{text},'{item}'");
917                    }
918                }
919                text = text.trim_start_matches(",").into();
920
921                self.params
922                    .where_and
923                    .push(format!("{join_table}.{field} {compare} ({text})"));
924            }
925            _ => {
926                self.params
927                    .where_and
928                    .push(format!("{join_table}.{field} {compare} '{value}'"));
929            }
930        }
931        self
932    }
933
934    fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
935        for f in field.split('|') {
936            if !super::sql_safety::validate_field_name(f) {
937                error!("Invalid field name: {}", f);
938            }
939        }
940        if !super::sql_safety::validate_compare_orator(compare) {
941            error!("Invalid compare operator: {}", compare);
942        }
943        let join_table = if self.params.join_table.is_empty() {
944            self.params.table.clone()
945        } else {
946            self.params.join_table.clone()
947        };
948
949        if value.is_boolean() {
950            let bool_val = value.as_bool().unwrap_or(false);
951            self.params
952                .where_or
953                .push(format!("{join_table}.{field} {compare} {bool_val}"));
954            return self;
955        }
956
957        match compare {
958            "between" => {
959                self.params.where_or.push(format!(
960                    "{}.{} between '{}' AND '{}'",
961                    join_table, field, value[0], value[1]
962                ));
963            }
964            "set" => {
965                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
966                let mut wheredata = vec![];
967                for item in list.iter() {
968                    wheredata.push(format!(
969                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
970                    ));
971                }
972                self.params
973                    .where_or
974                    .push(format!("({})", wheredata.join(" or ")));
975            }
976            "notin" => {
977                let mut text = String::new();
978                for item in value.members() {
979                    text = format!("{text},'{item}'");
980                }
981                text = text.trim_start_matches(",").into();
982                self.params
983                    .where_or
984                    .push(format!("{join_table}.{field} not in ({text})"));
985            }
986            "is" => {
987                self.params
988                    .where_or
989                    .push(format!("{join_table}.{field} is {value}"));
990            }
991            "isnot" => {
992                self.params
993                    .where_or
994                    .push(format!("{join_table}.{field} is not {value}"));
995            }
996            "in" => {
997                if value.is_array() && value.is_empty() {
998                    self.params.where_or.push("1=0".to_string());
999                    return self;
1000                }
1001                let mut text = String::new();
1002                if value.is_array() {
1003                    for item in value.members() {
1004                        text = format!("{text},'{item}'");
1005                    }
1006                } else {
1007                    let value = value.as_str().unwrap_or("");
1008                    let value: Vec<&str> = value.split(",").collect();
1009                    for item in value.iter() {
1010                        text = format!("{text},'{item}'");
1011                    }
1012                }
1013                text = text.trim_start_matches(",").into();
1014                self.params
1015                    .where_or
1016                    .push(format!("{join_table}.{field} {compare} ({text})"));
1017            }
1018            _ => {
1019                self.params
1020                    .where_or
1021                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1022            }
1023        }
1024        self
1025    }
1026
1027    fn where_raw(&mut self, expr: &str) -> &mut Self {
1028        self.params.where_and.push(expr.to_string());
1029        self
1030    }
1031
1032    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1033        self.params
1034            .where_and
1035            .push(format!("\"{field}\" IN ({sub_sql})"));
1036        self
1037    }
1038
1039    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1040        self.params
1041            .where_and
1042            .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1043        self
1044    }
1045
1046    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1047        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1048        self
1049    }
1050
1051    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1052        self.params
1053            .where_and
1054            .push(format!("NOT EXISTS ({sub_sql})"));
1055        self
1056    }
1057
1058    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1059        self.params.where_column = format!(
1060            "{}.{} {} {}.{}",
1061            self.params.table, field_a, compare, self.params.table, field_b
1062        );
1063        self
1064    }
1065
1066    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1067        self.params
1068            .update_column
1069            .push(format!("{field_a} = {compare}"));
1070        self
1071    }
1072
1073    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1074        self.params.page = page;
1075        self.params.limit = limit;
1076        self
1077    }
1078
1079    fn limit(&mut self, count: i32) -> &mut Self {
1080        self.params.limit_only = count;
1081        self
1082    }
1083
1084    fn column(&mut self, field: &str) -> JsonValue {
1085        self.field(field);
1086        let sql = self.params.select_sql();
1087
1088        if self.params.sql {
1089            return JsonValue::from(sql);
1090        }
1091        let (state, data) = self.query(sql.as_str());
1092        match state {
1093            true => {
1094                let mut list = array![];
1095                for item in data.members() {
1096                    if self.params.json[field].is_empty() {
1097                        let _ = list.push(item[field].clone());
1098                    } else {
1099                        let data =
1100                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1101                        let _ = list.push(data);
1102                    }
1103                }
1104                list
1105            }
1106            false => {
1107                array![]
1108            }
1109        }
1110    }
1111
1112    fn count(&mut self) -> JsonValue {
1113        self.params.fields = json::object! {};
1114        self.params.fields["count"] = "count(*) as count".to_string().into();
1115        let sql = self.params.select_sql();
1116        if self.params.sql {
1117            return JsonValue::from(sql.clone());
1118        }
1119        let (state, data) = self.query(sql.as_str());
1120        if state {
1121            data[0]["count"].clone()
1122        } else {
1123            JsonValue::from(0)
1124        }
1125    }
1126
1127    fn max(&mut self, field: &str) -> JsonValue {
1128        self.params.fields[field] = format!("max({field}) as {field}").into();
1129        let sql = self.params.select_sql();
1130        if self.params.sql {
1131            return JsonValue::from(sql.clone());
1132        }
1133        let (state, data) = self.query(sql.as_str());
1134        if state {
1135            if data.len() > 1 {
1136                return data.clone();
1137            }
1138            data[0][field].clone()
1139        } else {
1140            JsonValue::from(0)
1141        }
1142    }
1143
1144    fn min(&mut self, field: &str) -> JsonValue {
1145        self.params.fields[field] = format!("min({field}) as {field}").into();
1146        let sql = self.params.select_sql();
1147        if self.params.sql {
1148            return JsonValue::from(sql.clone());
1149        }
1150        let (state, data) = self.query(sql.as_str());
1151        if state {
1152            if data.len() > 1 {
1153                return data;
1154            }
1155            data[0][field].clone()
1156        } else {
1157            JsonValue::from(0)
1158        }
1159    }
1160
1161    fn sum(&mut self, field: &str) -> JsonValue {
1162        self.params.fields[field] = format!("sum({field}) as {field}").into();
1163        let sql = self.params.select_sql();
1164        if self.params.sql {
1165            return JsonValue::from(sql.clone());
1166        }
1167        let (state, data) = self.query(sql.as_str());
1168        match state {
1169            true => {
1170                if data.len() > 1 {
1171                    return data;
1172                }
1173                data[0][field].clone()
1174            }
1175            false => JsonValue::from(0),
1176        }
1177    }
1178
1179    fn avg(&mut self, field: &str) -> JsonValue {
1180        self.params.fields[field] = format!("avg({field}) as {field}").into();
1181        let sql = self.params.select_sql();
1182        if self.params.sql {
1183            return JsonValue::from(sql.clone());
1184        }
1185        let (state, data) = self.query(sql.as_str());
1186        if state {
1187            if data.len() > 1 {
1188                return data;
1189            }
1190            data[0][field].clone()
1191        } else {
1192            JsonValue::from(0)
1193        }
1194    }
1195
1196    fn having(&mut self, expr: &str) -> &mut Self {
1197        self.params.having.push(expr.to_string());
1198        self
1199    }
1200
1201    fn select(&mut self) -> JsonValue {
1202        let sql = self.params.select_sql();
1203        if self.params.sql {
1204            return JsonValue::from(sql.clone());
1205        }
1206        let (state, mut data) = self.query(sql.as_str());
1207        match state {
1208            true => {
1209                for (field, _) in self.params.json.entries() {
1210                    for item in data.members_mut() {
1211                        if !item[field].is_empty() {
1212                            let json = item[field].to_string();
1213                            item[field] = match json::parse(&json) {
1214                                Ok(e) => e,
1215                                Err(_) => JsonValue::from(json),
1216                            };
1217                        }
1218                    }
1219                }
1220                data.clone()
1221            }
1222            false => array![],
1223        }
1224    }
1225
1226    fn find(&mut self) -> JsonValue {
1227        self.params.page = 1;
1228        self.params.limit = 1;
1229        let sql = self.params.select_sql();
1230        if self.params.sql {
1231            return JsonValue::from(sql.clone());
1232        }
1233        let (state, mut data) = self.query(sql.as_str());
1234        match state {
1235            true => {
1236                if data.is_empty() {
1237                    return object! {};
1238                }
1239                for (field, _) in self.params.json.entries() {
1240                    if !data[0][field].is_empty() {
1241                        let json = data[0][field].to_string();
1242                        let json = json::parse(&json).unwrap_or(array![]);
1243                        data[0][field] = json;
1244                    } else {
1245                        data[0][field] = array![];
1246                    }
1247                }
1248                data[0].clone()
1249            }
1250            false => {
1251                object! {}
1252            }
1253        }
1254    }
1255
1256    fn value(&mut self, field: &str) -> JsonValue {
1257        self.params.fields = object! {};
1258        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1259        self.params.page = 1;
1260        self.params.limit = 1;
1261        let sql = self.params.select_sql();
1262        if self.params.sql {
1263            return JsonValue::from(sql.clone());
1264        }
1265        let (state, mut data) = self.query(sql.as_str());
1266        match state {
1267            true => {
1268                for (field, _) in self.params.json.entries() {
1269                    if !data[0][field].is_empty() {
1270                        let json = data[0][field].to_string();
1271                        let json = json::parse(&json).unwrap_or(array![]);
1272                        data[0][field] = json;
1273                    } else {
1274                        data[0][field] = array![];
1275                    }
1276                }
1277                data[0][field].clone()
1278            }
1279            false => {
1280                if self.connection.debug {
1281                    info!("{data:?}");
1282                }
1283                JsonValue::Null
1284            }
1285        }
1286    }
1287
1288    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1289        let fields_list = self.table_info(&self.params.table.clone());
1290        let mut fields = vec![];
1291        let mut values = vec![];
1292        if !self.params.autoinc && data["id"].is_empty() {
1293            let thread_id = format!("{:?}", std::thread::current().id());
1294            let thread_num: u64 = thread_id
1295                .trim_start_matches("ThreadId(")
1296                .trim_end_matches(")")
1297                .parse()
1298                .unwrap_or(0);
1299            data["id"] = format!(
1300                "{:X}{:X}",
1301                Local::now().timestamp_nanos_opt().unwrap_or(0),
1302                thread_num
1303            )
1304            .into();
1305        }
1306        for (field, value) in data.entries() {
1307            fields.push(format!("\"{}\"", field));
1308
1309            if value.is_string() {
1310                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1311                continue;
1312            } else if value.is_array() {
1313                if self.params.json[field].is_empty() {
1314                    let array = value
1315                        .members()
1316                        .map(|x| x.as_str().unwrap_or(""))
1317                        .collect::<Vec<&str>>()
1318                        .join(",");
1319                    values.push(format!("'{array}'"));
1320                } else {
1321                    let json = value.to_string();
1322                    let json = json.replace("'", "''");
1323                    values.push(format!("'{json}'"));
1324                }
1325                continue;
1326            } else if value.is_object() {
1327                if self.params.json[field].is_empty() {
1328                    values.push(format!("'{value}'"));
1329                } else {
1330                    let json = value.to_string();
1331                    let json = json.replace("'", "''");
1332                    values.push(format!("'{json}'"));
1333                }
1334                continue;
1335            } else if value.is_number() {
1336                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1337                if col_type == "boolean" {
1338                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1339                    values.push(format!("{bool_val}"));
1340                } else if col_type.contains("int") {
1341                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1342                } else {
1343                    values.push(format!("{value}"));
1344                }
1345                continue;
1346            } else if value.is_boolean() || value.is_null() {
1347                values.push(format!("{value}"));
1348                continue;
1349            } else {
1350                values.push(format!("'{value}'"));
1351                continue;
1352            }
1353        }
1354        let fields = fields.join(",");
1355        let values = values.join(",");
1356
1357        let sql = format!(
1358            "INSERT INTO {} ({}) VALUES ({});",
1359            self.params.table, fields, values
1360        );
1361        if self.params.sql {
1362            return JsonValue::from(sql.clone());
1363        }
1364        let (state, ids) = self.execute(sql.as_str());
1365
1366        match state {
1367            true => match self.params.autoinc {
1368                true => ids.clone(),
1369                false => data["id"].clone(),
1370            },
1371            false => {
1372                let thread_id = format!("{:?}", thread::current().id());
1373                error!("添加失败: {thread_id} {ids:?} {sql}");
1374                JsonValue::from("")
1375            }
1376        }
1377    }
1378    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1379        let fields_list = self.table_info(&self.params.table.clone());
1380        let mut fields = String::new();
1381        if !self.params.autoinc && data[0]["id"].is_empty() {
1382            data[0]["id"] = "".into();
1383        }
1384        for (field, _) in data[0].entries() {
1385            fields = format!("{fields},\"{field}\"");
1386        }
1387        fields = fields.trim_start_matches(",").to_string();
1388
1389        let core_count = num_cpus::get();
1390        let mut p = pools::Pool::new(core_count * 4);
1391
1392        let autoinc = self.params.autoinc;
1393        for list in data.members() {
1394            let mut item = list.clone();
1395            let i = br_fields::str::Code::verification_code(3);
1396            let fields_list_new = fields_list.clone();
1397            p.execute(move |pcindex| {
1398                if !autoinc && item["id"].is_empty() {
1399                    let id = format!(
1400                        "{:X}{:X}{}",
1401                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1402                        pcindex,
1403                        i
1404                    );
1405                    item["id"] = id.into();
1406                }
1407                let mut values = "".to_string();
1408                for (field, value) in item.entries() {
1409                    if value.is_string() {
1410                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1411                    } else if value.is_number() {
1412                        let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1413                        if col_type == "boolean" {
1414                            let bool_val = value.as_i64().unwrap_or(0) != 0;
1415                            values = format!("{values},{bool_val}");
1416                        } else if col_type.contains("int") {
1417                            values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1418                        } else {
1419                            values = format!("{values},{value}");
1420                        }
1421                    } else if value.is_boolean() {
1422                        values = format!("{values},{value}");
1423                        continue;
1424                    } else {
1425                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1426                    }
1427                }
1428                values = format!("({})", values.trim_start_matches(","));
1429                array![item["id"].clone(), values]
1430            });
1431        }
1432        let (ids_list, mut values) = p.insert_all();
1433        values = values.trim_start_matches(",").to_string();
1434        let sql = format!(
1435            "INSERT INTO {} ({}) VALUES {};",
1436            self.params.table, fields, values
1437        );
1438
1439        if self.params.sql {
1440            return JsonValue::from(sql.clone());
1441        }
1442        let (state, data) = self.execute(sql.as_str());
1443        match state {
1444            true => match autoinc {
1445                true => data,
1446                false => JsonValue::from(ids_list),
1447            },
1448            false => {
1449                error!("insert_all: {data:?}");
1450                array![]
1451            }
1452        }
1453    }
1454    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1455        let fields_list = self.table_info(&self.params.table.clone());
1456        let mut fields = vec![];
1457        let mut values = vec![];
1458        if !self.params.autoinc && data["id"].is_empty() {
1459            let thread_id = format!("{:?}", std::thread::current().id());
1460            let thread_num: u64 = thread_id
1461                .trim_start_matches("ThreadId(")
1462                .trim_end_matches(")")
1463                .parse()
1464                .unwrap_or(0);
1465            data["id"] = format!(
1466                "{:X}{:X}",
1467                Local::now().timestamp_nanos_opt().unwrap_or(0),
1468                thread_num
1469            )
1470            .into();
1471        }
1472        for (field, value) in data.entries() {
1473            fields.push(format!("\"{}\"", field));
1474
1475            if value.is_string() {
1476                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1477                continue;
1478            } else if value.is_array() {
1479                if self.params.json[field].is_empty() {
1480                    let array = value
1481                        .members()
1482                        .map(|x| x.as_str().unwrap_or(""))
1483                        .collect::<Vec<&str>>()
1484                        .join(",");
1485                    values.push(format!("'{array}'"));
1486                } else {
1487                    let json = value.to_string();
1488                    let json = json.replace("'", "''");
1489                    values.push(format!("'{json}'"));
1490                }
1491                continue;
1492            } else if value.is_object() {
1493                if self.params.json[field].is_empty() {
1494                    values.push(format!("'{value}'"));
1495                } else {
1496                    let json = value.to_string();
1497                    let json = json.replace("'", "''");
1498                    values.push(format!("'{json}'"));
1499                }
1500                continue;
1501            } else if value.is_number() {
1502                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1503                if col_type == "boolean" {
1504                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1505                    values.push(format!("{bool_val}"));
1506                } else if col_type.contains("int") {
1507                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1508                } else {
1509                    values.push(format!("{value}"));
1510                }
1511                continue;
1512            } else if value.is_boolean() || value.is_null() {
1513                values.push(format!("{value}"));
1514                continue;
1515            } else {
1516                values.push(format!("'{value}'"));
1517                continue;
1518            }
1519        }
1520
1521        let conflict_cols: Vec<String> = conflict_fields
1522            .iter()
1523            .map(|f| format!("\"{}\"", f))
1524            .collect();
1525
1526        let update_set: Vec<String> = fields
1527            .iter()
1528            .filter(|f| {
1529                let name = f.trim_matches('"');
1530                !conflict_fields.contains(&name) && name != "id"
1531            })
1532            .map(|f| format!("{f}=EXCLUDED.{f}"))
1533            .collect();
1534
1535        let fields_str = fields.join(",");
1536        let values_str = values.join(",");
1537
1538        let sql = format!(
1539            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1540            self.params.table,
1541            fields_str,
1542            values_str,
1543            conflict_cols.join(","),
1544            update_set.join(",")
1545        );
1546        if self.params.sql {
1547            return JsonValue::from(sql.clone());
1548        }
1549        let (state, result) = self.execute(sql.as_str());
1550        match state {
1551            true => match self.params.autoinc {
1552                true => result.clone(),
1553                false => data["id"].clone(),
1554            },
1555            false => {
1556                let thread_id = format!("{:?}", thread::current().id());
1557                error!("upsert失败: {thread_id} {result:?} {sql}");
1558                JsonValue::from("")
1559            }
1560        }
1561    }
1562    fn update(&mut self, data: JsonValue) -> JsonValue {
1563        let fields_list = self.table_info(&self.params.table.clone());
1564        let mut values = vec![];
1565        for (field, value) in data.entries() {
1566            if value.is_string() {
1567                values.push(format!(
1568                    "\"{}\"='{}'",
1569                    field,
1570                    value.to_string().replace("'", "''")
1571                ));
1572            } else if value.is_number() {
1573                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1574                if col_type == "boolean" {
1575                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1576                    values.push(format!("\"{field}\"= {bool_val}"));
1577                } else if col_type.contains("int") {
1578                    values.push(format!(
1579                        "\"{}\"= {}",
1580                        field,
1581                        value.as_f64().unwrap_or(0.0) as i64
1582                    ));
1583                } else {
1584                    values.push(format!("\"{field}\"= {value}"));
1585                }
1586            } else if value.is_array() {
1587                if self.params.json[field].is_empty() {
1588                    let array = value
1589                        .members()
1590                        .map(|x| x.as_str().unwrap_or(""))
1591                        .collect::<Vec<&str>>()
1592                        .join(",");
1593                    values.push(format!("\"{field}\"='{array}'"));
1594                } else {
1595                    let json = value.to_string();
1596                    let json = json.replace("'", "''");
1597                    values.push(format!("\"{field}\"='{json}'"));
1598                }
1599                continue;
1600            } else if value.is_object() {
1601                if self.params.json[field].is_empty() {
1602                    values.push(format!("\"{field}\"='{value}'"));
1603                } else {
1604                    if value.is_empty() {
1605                        values.push(format!("\"{field}\"=''"));
1606                        continue;
1607                    }
1608                    let json = value.to_string();
1609                    let json = json.replace("'", "''");
1610                    values.push(format!("\"{field}\"='{json}'"));
1611                }
1612                continue;
1613            } else if value.is_boolean() {
1614                values.push(format!("\"{field}\"= {value}"));
1615            } else {
1616                values.push(format!("\"{field}\"=\"{value}\""));
1617            }
1618        }
1619
1620        for (field, value) in self.params.inc_dec.entries() {
1621            values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1622        }
1623        if !self.params.update_column.is_empty() {
1624            values.extend(self.params.update_column.clone());
1625        }
1626        let values = values.join(",");
1627
1628        let sql = format!(
1629            "UPDATE {} SET {} {};",
1630            self.params.table.clone(),
1631            values,
1632            self.params.where_sql()
1633        );
1634        if self.params.sql {
1635            return JsonValue::from(sql.clone());
1636        }
1637        let (state, data) = self.execute(sql.as_str());
1638        if state {
1639            data
1640        } else {
1641            let thread_id = format!("{:?}", thread::current().id());
1642            error!("update: {thread_id} {data:?} {sql}");
1643            0.into()
1644        }
1645    }
1646    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1647        let fields_list = self.table_info(&self.params.table.clone());
1648        let mut values = vec![];
1649
1650        let mut ids = vec![];
1651        for (field, _) in data[0].entries() {
1652            if field == "id" {
1653                continue;
1654            }
1655            let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1656            let mut fields = vec![];
1657            for row in data.members() {
1658                let value = row[field].clone();
1659                let id = row["id"].clone();
1660                ids.push(id.clone());
1661                if value.is_string() {
1662                    fields.push(format!(
1663                        "WHEN '{}' THEN '{}'",
1664                        id,
1665                        value.to_string().replace("'", "''")
1666                    ));
1667                } else if value.is_array() || value.is_object() {
1668                    if self.params.json[field].is_empty() {
1669                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1670                    } else {
1671                        let json = value.to_string();
1672                        let json = json.replace("'", "''");
1673                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1674                    }
1675                    continue;
1676                } else if value.is_number() {
1677                    if col_type == "boolean" {
1678                        let bool_val = value.as_i64().unwrap_or(0) != 0;
1679                        fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1680                    } else {
1681                        fields.push(format!("WHEN '{id}' THEN {value}"));
1682                    }
1683                } else if value.is_boolean() || value.is_null() {
1684                    fields.push(format!("WHEN '{id}' THEN {value}"));
1685                } else {
1686                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1687                }
1688            }
1689            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1690        }
1691        self.where_and("id", "in", ids.into());
1692        for (field, value) in self.params.inc_dec.entries() {
1693            values.push(format!("{} = {}", field, value.to_string().clone()));
1694        }
1695
1696        let values = values.join(",");
1697        let sql = format!(
1698            "UPDATE {} SET {} {} {};",
1699            self.params.table.clone(),
1700            values,
1701            self.params.where_sql(),
1702            self.params.page_limit_sql()
1703        );
1704        if self.params.sql {
1705            return JsonValue::from(sql.clone());
1706        }
1707        let (state, data) = self.execute(sql.as_str());
1708        if state {
1709            data
1710        } else {
1711            error!("update_all: {data:?}");
1712            JsonValue::from(0)
1713        }
1714    }
1715
1716    fn delete(&mut self) -> JsonValue {
1717        let sql = format!(
1718            "delete FROM {} {} {};",
1719            self.params.table.clone(),
1720            self.params.where_sql(),
1721            self.params.page_limit_sql()
1722        );
1723        if self.params.sql {
1724            return JsonValue::from(sql.clone());
1725        }
1726        let (state, data) = self.execute(sql.as_str());
1727        match state {
1728            true => data,
1729            false => {
1730                error!("delete 失败>>> {data:?}");
1731                JsonValue::from(0)
1732            }
1733        }
1734    }
1735
1736    fn transaction(&mut self) -> bool {
1737        let thread_id = format!("{:?}", thread::current().id());
1738        let key = format!("{}{}", self.default, thread_id);
1739
1740        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1741            let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1742            PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1743            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1744            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1745            return true;
1746        }
1747
1748        let mut conn = match self.client.get_connect_for_transaction() {
1749            Ok(c) => c,
1750            Err(e) => {
1751                error!("获取事务连接失败: {e}");
1752                return false;
1753            }
1754        };
1755
1756        if !conn.is_valid() {
1757            error!("事务连接无效");
1758            self.client.release_transaction_conn();
1759            return false;
1760        }
1761
1762        if let Err(e) = conn.execute("START TRANSACTION") {
1763            error!("启动事务失败: {e}");
1764            self.client.release_transaction_conn();
1765            return false;
1766        }
1767
1768        if let Err(e) = conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") {
1769            error!("设置事务隔离级别失败: {e}");
1770            let _ = conn.execute("ROLLBACK");
1771            self.client.release_transaction_conn();
1772            return false;
1773        }
1774
1775        PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1776        true
1777    }
1778    fn commit(&mut self) -> bool {
1779        let thread_id = format!("{:?}", thread::current().id());
1780        let key = format!("{}{}", self.default, thread_id);
1781
1782        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1783            error!("commit: 没有活跃的事务");
1784            return false;
1785        }
1786
1787        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1788        if depth > 1 {
1789            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1790            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1791            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1792            return true;
1793        }
1794
1795        let commit_result =
1796            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1797
1798        let success = match commit_result {
1799            Some(Ok(_)) => true,
1800            Some(Err(e)) => {
1801                error!("提交事务失败: {e}");
1802                false
1803            }
1804            None => {
1805                error!("提交事务失败: 未找到连接");
1806                false
1807            }
1808        };
1809
1810        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1811        self.client.release_transaction_conn();
1812        success
1813    }
1814
1815    fn rollback(&mut self) -> bool {
1816        let thread_id = format!("{:?}", thread::current().id());
1817        let key = format!("{}{}", self.default, thread_id);
1818
1819        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1820            error!("rollback: 没有活跃的事务");
1821            return false;
1822        }
1823
1824        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1825        if depth > 1 {
1826            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1827            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1828            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1829            return true;
1830        }
1831
1832        let rollback_result =
1833            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1834
1835        let success = match rollback_result {
1836            Some(Ok(_)) => true,
1837            Some(Err(e)) => {
1838                error!("回滚失败: {e}");
1839                false
1840            }
1841            None => {
1842                error!("回滚失败: 未找到连接");
1843                false
1844            }
1845        };
1846
1847        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1848        self.client.release_transaction_conn();
1849        success
1850    }
1851
1852    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1853        let (state, data) = self.query(sql);
1854        match state {
1855            true => Ok(data),
1856            false => Err(data.to_string()),
1857        }
1858    }
1859
1860    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1861        let (state, data) = self.execute(sql);
1862        match state {
1863            true => Ok(data),
1864            false => Err(data.to_string()),
1865        }
1866    }
1867
1868    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1869        self.params.inc_dec[field] = format!("{field} + {num}").into();
1870        self
1871    }
1872
1873    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1874        self.params.inc_dec[field] = format!("{field} - {num}").into();
1875        self
1876    }
1877    fn buildsql(&mut self) -> String {
1878        self.fetch_sql();
1879        let sql = self.select().to_string();
1880        format!("( {} ) {}", sql, self.params.table)
1881    }
1882
1883    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1884        for field in fields {
1885            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1886        }
1887        self
1888    }
1889
1890    fn join(
1891        &mut self,
1892        main_table: &str,
1893        main_fields: &str,
1894        right_table: &str,
1895        right_fields: &str,
1896    ) -> &mut Self {
1897        let main_table = if main_table.is_empty() {
1898            self.params.table.clone()
1899        } else {
1900            main_table.to_string()
1901        };
1902        self.params.join_table = right_table.to_string();
1903        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1904        self
1905    }
1906
1907    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1908        let main_fields = if main_fields.is_empty() {
1909            "id"
1910        } else {
1911            main_fields
1912        };
1913        let second_fields = if second_fields.is_empty() {
1914            self.params.table.clone()
1915        } else {
1916            second_fields.to_string().clone()
1917        };
1918        let sec_table_name = format!("{}{}", table, "_2");
1919        let second_table = format!("{} {}", table, sec_table_name.clone());
1920        self.params.join_table = sec_table_name.clone();
1921        self.params.join.push(format!(
1922            " INNER JOIN {} ON {}.{} = {}.{}",
1923            second_table, self.params.table, main_fields, sec_table_name, second_fields
1924        ));
1925        self
1926    }
1927
1928    fn join_right(
1929        &mut self,
1930        main_table: &str,
1931        main_fields: &str,
1932        right_table: &str,
1933        right_fields: &str,
1934    ) -> &mut Self {
1935        let main_table = if main_table.is_empty() {
1936            self.params.table.clone()
1937        } else {
1938            main_table.to_string()
1939        };
1940        self.params.join_table = right_table.to_string();
1941        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1942        self
1943    }
1944
1945    fn join_full(
1946        &mut self,
1947        main_table: &str,
1948        main_fields: &str,
1949        right_table: &str,
1950        right_fields: &str,
1951    ) -> &mut Self {
1952        let main_table = if main_table.is_empty() {
1953            self.params.table.clone()
1954        } else {
1955            main_table.to_string()
1956        };
1957        self.params.join_table = right_table.to_string();
1958        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1959        self
1960    }
1961
1962    fn union(&mut self, sub_sql: &str) -> &mut Self {
1963        self.params.unions.push(format!("UNION {sub_sql}"));
1964        self
1965    }
1966
1967    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1968        self.params.unions.push(format!("UNION ALL {sub_sql}"));
1969        self
1970    }
1971
1972    fn lock_for_update(&mut self) -> &mut Self {
1973        self.params.lock_mode = "FOR UPDATE".to_string();
1974        self
1975    }
1976
1977    fn lock_for_share(&mut self) -> &mut Self {
1978        self.params.lock_mode = "FOR SHARE".to_string();
1979        self
1980    }
1981}