Skip to main content

br_db/types/
pgsql.rs

1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use crate::TABLE_FIELDS;
6use br_pgsql::pools::Pools;
7use br_pgsql::PgsqlError;
8use chrono::Local;
9use json::{array, object, JsonValue};
10use log::{error, info, warn};
11use std::thread;
12#[derive(Clone)]
13pub struct Pgsql {
14    /// 当前连接配置
15    pub connection: Connection,
16    /// 当前选中配置
17    pub default: String,
18    pub params: Params,
19    pub client: Pools,
20}
21
22impl Pgsql {
23    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
24        let port = connection
25            .hostport
26            .parse::<i32>()
27            .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
28
29        let cp_connection = connection.clone();
30        let config = object! {
31            debug: cp_connection.debug,
32            username: cp_connection.username,
33            userpass: cp_connection.userpass,
34            database: cp_connection.database,
35            hostname: cp_connection.hostname,
36            hostport: port,
37            charset: cp_connection.charset.str(),
38            pool_max: cp_connection.pool.max_connections,
39        };
40        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
41
42        let pools = pgsql.pools()?;
43        Ok(Self {
44            connection,
45            default: default.clone(),
46            params: Params::default("pgsql"),
47            client: pools,
48        })
49    }
50
51    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
52        let thread_id = format!("{:?}", thread::current().id());
53        let key = format!("{}{}", self.default, thread_id);
54
55        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
56            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
57
58            match result {
59                Some(Ok(e)) => {
60                    if self.connection.debug {
61                        info!("查询成功: {} {}", thread_id.clone(), sql);
62                    }
63                    (true, e.rows)
64                }
65                Some(Err(e)) => {
66                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
67                    (false, JsonValue::from(e.to_string()))
68                }
69                None => {
70                    error!("事务查询失败: 未找到事务连接 {thread_id}");
71                    (false, JsonValue::from("未找到事务连接"))
72                }
73            }
74        } else {
75            let mut guard = match self.client.get_guard() {
76                Ok(g) => g,
77                Err(e) => {
78                    // 连接池层已内部重试,此处快速失败(与 MySQL try_get_conn 行为一致)
79                    error!(
80                        "非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
81                    );
82                    return (false, JsonValue::from(e.to_string()));
83                }
84            };
85            match guard.conn().query(sql) {
86                Ok(e) => {
87                    if self.connection.debug {
88                        info!("查询成功: {} {}", thread_id.clone(), sql);
89                    }
90                    (true, e.rows)
91                }
92                Err(ref e) if Self::is_retriable_error(e) => {
93                    // 查询执行时连接断开,丢弃坏连接后重试一次
94
95                    guard.discard();
96                    // failover 场景:池里其他连接可能也已死亡,清空空闲连接强制走 Create 路径
97                    self.client.flush_idle();
98                    warn!("非事务查询连接断开(重试一次): {thread_id} {e}");
99                    thread::sleep(std::time::Duration::from_millis(200));
100                    let mut guard2 = match self.client.get_guard() {
101                        Ok(g) => g,
102                        Err(e) => {
103                            error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
104                            return (false, JsonValue::from(e.to_string()));
105                        }
106                    };
107                    match guard2.conn().query(sql) {
108                        Ok(e) => {
109                            if self.connection.debug {
110                                info!("查询成功(重试): {} {}", thread_id.clone(), sql);
111                            }
112                            (true, e.rows)
113                        }
114                        Err(e) => {
115                            error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
116                            (false, JsonValue::from(e.to_string()))
117                        }
118                    }
119                }
120                Err(e) => {
121                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
122                    (false, JsonValue::from(e.to_string()))
123                }
124            }
125        }
126    }
127    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
128        let thread_id = format!("{:?}", thread::current().id());
129        let key = format!("{}{}", self.default, thread_id);
130
131        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
132            let lock_timeout = self.dynamic_table_lock_timeout(&key);
133            if self.params.table.is_empty() {
134                warn!("事务写操作未设置表名,跳过应用层表锁: {thread_id}");
135            } else if !PGSQL_TRANSACTION_MANAGER.acquire_table_lock(
136                &self.params.table,
137                &key,
138                lock_timeout,
139            ) {
140                error!("获取表锁超时: {} {}", self.params.table, thread_id);
141                return (false, JsonValue::from("table lock timeout"));
142            }
143            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
144
145            match result {
146                Some(Ok(e)) => {
147                    if self.connection.debug {
148                        info!("提交成功: {} {}", thread_id.clone(), sql);
149                    }
150                    if sql.contains("INSERT") {
151                        (true, e.rows)
152                    } else {
153                        (true, e.affect_count.into())
154                    }
155                }
156                Some(Err(e)) => {
157                    error!("事务提交失败: {thread_id} {e}");
158                    (false, JsonValue::from(e.to_string()))
159                }
160                None => {
161                    error!("事务执行失败: 未找到事务连接 {thread_id}");
162                    (false, JsonValue::from("未找到事务连接"))
163                }
164            }
165        } else {
166            // 连接池层已内部重试,此处快速失败(与 MySQL try_get_conn 行为一致)
167            let mut guard = match self.client.get_guard() {
168                Ok(g) => g,
169                Err(e) => {
170                    error!(
171                        "非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
172                    );
173                    return (false, JsonValue::from(e.to_string()));
174                }
175            };
176            match guard.conn().execute(sql) {
177                Ok(e) => {
178                    if self.connection.debug {
179                        info!("提交成功: {} {}", thread_id.clone(), sql);
180                    }
181                    if sql.contains("INSERT") {
182                        (true, e.rows)
183                    } else {
184                        (true, e.affect_count.into())
185                    }
186                }
187                Err(ref e) if Self::is_retriable_error(e) => {
188                    // 执行时连接断开,丢弃坏连接后重试一次
189
190                    guard.discard();
191                    // failover 场景:池里其他连接可能也已死亡,清空空闲连接强制走 Create 路径
192                    self.client.flush_idle();
193                    warn!("非事务执行连接断开(重试一次): {thread_id} {e}");
194                    thread::sleep(std::time::Duration::from_millis(200));
195                    let mut guard2 = match self.client.get_guard() {
196                        Ok(g) => g,
197                        Err(e) => {
198                            error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
199                            return (false, JsonValue::from(e.to_string()));
200                        }
201                    };
202                    match guard2.conn().execute(sql) {
203                        Ok(e) => {
204                            if self.connection.debug {
205                                info!("提交成功(重试): {} {}", thread_id.clone(), sql);
206                            }
207                            if sql.contains("INSERT") {
208                                (true, e.rows)
209                            } else {
210                                (true, e.affect_count.into())
211                            }
212                        }
213                        Err(e) => {
214                            error!("非事务执行重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
215                            (false, JsonValue::from(e.to_string()))
216                        }
217                    }
218                }
219                Err(e) => {
220                    error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
221                    (false, JsonValue::from(e.to_string()))
222                }
223            }
224        }
225    }
226
227    /// 判断是否为可重试的连接类错误(连接断开、IO错误、超时)
228    fn is_retriable_error(e: &PgsqlError) -> bool {
229        matches!(
230            e,
231            PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
232        )
233    }
234
235    fn dynamic_table_lock_timeout(&self, key: &str) -> std::time::Duration {
236        let pool = &self.connection.pool;
237        let min_secs = pool.connect_timeout_secs.max(1);
238        let base_secs = pool.write_timeout_secs.max(min_secs);
239        let max_secs = pool
240            .read_timeout_secs
241            .saturating_add(pool.write_timeout_secs)
242            .max(base_secs.saturating_add(10))
243            .max(min_secs);
244
245        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(key).max(1) as u64;
246        let (conn_count, lock_count) = PGSQL_TRANSACTION_MANAGER.stats();
247        let pressure = (conn_count + lock_count) as u64;
248        let pressure_bonus = (pressure / 2).min(10);
249        let depth_bonus = depth.saturating_sub(1).min(5);
250
251        let timeout_secs = base_secs
252            .saturating_add(pressure_bonus)
253            .saturating_add(depth_bonus)
254            .clamp(min_secs, max_secs);
255
256        std::time::Duration::from_secs(timeout_secs)
257    }
258}
259
260impl DbMode for Pgsql {
261    fn database_tables(&mut self) -> JsonValue {
262        let sql = "SHOW TABLES".to_string();
263        match self.sql(sql.as_str()) {
264            Ok(e) => {
265                let mut list = vec![];
266                for item in e.members() {
267                    for (_, value) in item.entries() {
268                        list.push(value.clone());
269                    }
270                }
271                list.into()
272            }
273            Err(_) => {
274                array![]
275            }
276        }
277    }
278
279    fn database_create(&mut self, name: &str) -> bool {
280        let sql = format!("CREATE DATABASE {name}");
281
282        let (state, data) = self.execute(sql.as_str());
283        match state {
284            true => data.as_bool().unwrap_or(true),
285            false => {
286                error!("创建数据库失败: {data:?}");
287                false
288            }
289        }
290    }
291
292    fn truncate(&mut self, table: &str) -> bool {
293        let sql = format!("TRUNCATE TABLE {table}");
294        let (state, _) = self.execute(sql.as_str());
295        state
296    }
297}
298
299impl Mode for Pgsql {
300    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
301        let mut sql = String::new();
302        let mut comments = vec![];
303
304        if !options.table_unique.is_empty() {
305            let full_name = format!(
306                "{}_unique_{}",
307                options.table_name,
308                options.table_unique.join("_")
309            );
310            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
311            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
312            let unique = format!(
313                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
314                name,
315                options.table_name,
316                options.table_unique.join(",")
317            );
318            comments.push(unique);
319        }
320
321        for row in options.table_index.iter() {
322            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
323            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
324            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
325            let index = format!(
326                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
327                name,
328                options.table_name,
329                row.join(",")
330            );
331            comments.push(index);
332        }
333
334        for (name, field) in options.table_fields.entries_mut() {
335            field["table_name"] = options.table_name.clone().into();
336            let row = br_fields::field("pgsql", name, field.clone());
337            let (col_sql, meta) = if let Some(idx) = row.find("--") {
338                (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
339            } else {
340                (row.trim(), None)
341            };
342            if let Some(meta) = meta {
343                comments.push(format!(
344                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
345                    options.table_name, name, meta
346                ));
347            }
348            sql = format!("{} {},\r\n", sql, col_sql);
349        }
350
351        let primary_key = format!(
352            "CONSTRAINT {}_{} PRIMARY KEY ({})",
353            options.table_name, options.table_key, options.table_key
354        );
355        let sql = format!(
356            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
357            options.table_name,
358            sql.trim_end_matches(",\r\n"),
359            primary_key
360        );
361        comments.insert(0, sql);
362
363        for (_name, field) in options.table_fields.entries() {
364            let _ = field["mode"].as_str();
365        }
366
367        if self.params.sql {
368            let info = comments.join("\r\n");
369            return JsonValue::from(info);
370        }
371        for comment in comments {
372            let (state, _) = self.execute(comment.as_str());
373            match state {
374                true => {}
375                false => {
376                    return JsonValue::from(state);
377                }
378            }
379        }
380        JsonValue::from(true)
381    }
382
383    fn table_update(&mut self, options: TableOptions) -> JsonValue {
384        // table_update 前清除缓存,确保获取最新表结构
385        let cache_key = format!("{}{}", self.default, options.table_name);
386        let table_fields_guard = match TABLE_FIELDS.read() {
387            Ok(g) => g,
388            Err(e) => e.into_inner(),
389        };
390        if table_fields_guard.get(&cache_key).is_some() {
391            drop(table_fields_guard);
392            let mut table_fields_guard = match TABLE_FIELDS.write() {
393                Ok(g) => g,
394                Err(e) => e.into_inner(),
395            };
396            table_fields_guard.remove(&cache_key);
397        } else {
398            drop(table_fields_guard);
399        }
400        let fields_list = self.table_info(&options.table_name);
401        let mut put = vec![];
402        let mut add = vec![];
403        let mut del = vec![];
404        let mut comments = vec![];
405
406        for (key, _) in fields_list.entries() {
407            if options.table_fields[key].is_empty() {
408                del.push(key);
409            }
410        }
411        for (name, field) in options.table_fields.entries() {
412            if !fields_list[name].is_empty() {
413                let old_info = &fields_list[name];
414                let new_field_sql = br_fields::field("pgsql", name, field.clone());
415
416                let old_comment = old_info["comment"].as_str().unwrap_or("");
417                let old_type = old_info["type"].as_str().unwrap_or("");
418
419                let new_comment = if let Some(idx) = new_field_sql.find("--") {
420                    new_field_sql[idx + 2..].trim()
421                } else {
422                    ""
423                };
424
425                let comment_matches =
426                    if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
427                        let old_parts: Vec<&str> = old_comment.split('|').collect();
428                        let new_parts: Vec<&str> = new_comment.split('|').collect();
429                        if old_parts.len() >= 4 && new_parts.len() >= 4 {
430                            old_parts[..4] == new_parts[..4]
431                        } else {
432                            old_comment == new_comment
433                        }
434                    } else if !old_comment.is_empty() && !new_comment.is_empty() {
435                        let old_parts: Vec<&str> = old_comment.split('|').collect();
436                        let new_parts: Vec<&str> = new_comment.split('|').collect();
437                        if old_parts.len() >= 2
438                            && new_parts.len() >= 2
439                            && old_parts.len() == new_parts.len()
440                        {
441                            let old_filtered: Vec<&str> = old_parts
442                                .iter()
443                                .filter(|v| **v != "true" && **v != "false")
444                                .copied()
445                                .collect();
446                            let new_filtered: Vec<&str> = new_parts
447                                .iter()
448                                .filter(|v| **v != "true" && **v != "false")
449                                .copied()
450                                .collect();
451                            old_filtered == new_filtered
452                        } else {
453                            old_comment == new_comment
454                        }
455                    } else {
456                        old_comment == new_comment
457                    };
458
459                let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
460                let new_type = if sql_parts.len() > 1 {
461                    sql_parts[1].to_lowercase()
462                } else {
463                    String::new()
464                };
465
466                let type_matches = match old_type {
467                    "integer" => {
468                        new_type.contains("int")
469                            && !new_type.contains("bigint")
470                            && !new_type.contains("smallint")
471                    }
472                    "bigint" => new_type.contains("bigint"),
473                    "smallint" => new_type.contains("smallint"),
474                    "boolean" => new_type.contains("boolean"),
475                    "text" => new_type.contains("text"),
476                    "character varying" => {
477                        if !new_type.contains("varchar") {
478                            false
479                        } else {
480                            let old_len = old_info["max_length"].as_i64().unwrap_or(0);
481                            let new_len = new_type
482                                .trim_start_matches("varchar(")
483                                .trim_end_matches(')')
484                                .parse::<i64>()
485                                .unwrap_or(0);
486                            let matched = old_len == new_len || new_len == 0;
487                            if !matched {
488                                log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
489                            }
490                            old_len == new_len || new_len == 0
491                        }
492                    }
493                    "character" => new_type.contains("char") && !new_type.contains("varchar"),
494                    "numeric" => {
495                        if !(new_type.contains("numeric") || new_type.contains("decimal")) {
496                            false
497                        } else {
498                            let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
499                            let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
500                            let inner = new_type
501                                .replace("numeric(", "")
502                                .replace("decimal(", "")
503                                .replace(')', "");
504                            let parts: Vec<&str> = inner.split(',').collect();
505                            let new_prec = parts
506                                .first()
507                                .and_then(|s| s.trim().parse::<i64>().ok())
508                                .unwrap_or(0);
509                            let new_scale = parts
510                                .get(1)
511                                .and_then(|s| s.trim().parse::<i64>().ok())
512                                .unwrap_or(0);
513                            old_prec == new_prec && old_scale == new_scale
514                        }
515                    }
516                    "double precision" => {
517                        new_type.contains("double") || new_type.contains("float8")
518                    }
519                    "real" => new_type.contains("real") || new_type.contains("float4"),
520                    "timestamp without time zone" | "timestamp with time zone" => {
521                        new_type.contains("timestamp")
522                    }
523                    "date" => new_type.contains("date") && !new_type.contains("timestamp"),
524                    "time without time zone" | "time with time zone" => {
525                        new_type.contains("time") && !new_type.contains("timestamp")
526                    }
527                    "json" | "jsonb" => new_type.contains("json"),
528                    "uuid" => new_type.contains("uuid"),
529                    "bytea" => new_type.contains("bytea"),
530                    _ => old_type == new_type,
531                };
532
533                if type_matches && comment_matches {
534                    continue;
535                }
536
537                log::debug!(
538                    "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
539                    options.table_name,
540                    name,
541                    type_matches,
542                    old_type,
543                    new_type,
544                    comment_matches
545                );
546                put.push(name);
547            } else {
548                add.push(name);
549            }
550        }
551
552        for name in add.iter() {
553            let name = name.to_string();
554            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
555            let rows = row.split("--").collect::<Vec<&str>>();
556            comments.push(format!(
557                r#"ALTER TABLE "{}" add {};"#,
558                options.table_name,
559                rows[0].trim()
560            ));
561            if rows.len() > 1 {
562                comments.push(format!(
563                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
564                    options.table_name,
565                    name,
566                    rows[1].trim()
567                ));
568            }
569        }
570        for name in del.iter() {
571            comments.push(format!(
572                "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
573                options.table_name, name
574            ));
575        }
576        for name in put.iter() {
577            let name = name.to_string();
578            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
579            let rows = row.split("--").collect::<Vec<&str>>();
580
581            let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
582
583            if sql[1].contains("BOOLEAN") {
584                let text = format!(
585                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
586                    options.table_name, name
587                );
588                comments.push(text.clone());
589                let text = format!(
590                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
591                    options.table_name, name, sql[1]
592                );
593                comments.push(text.clone());
594            } else {
595                let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
596                let new_type_lower = sql[1].to_lowercase();
597                let is_date_to_numeric = (old_col_type == "date"
598                    || old_col_type.contains("timestamp"))
599                    && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
600                if is_date_to_numeric {
601                    comments.push(format!(
602                        "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
603                        options.table_name, name
604                    ));
605                    comments.push(format!(
606                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
607                        options.table_name, name, sql[1], name, name, name
608                    ));
609                } else {
610                    let text = format!(
611                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
612                        options.table_name, name, sql[1]
613                    );
614                    comments.push(text.clone());
615                }
616            };
617
618            if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
619                let default_value = rows[0][default_pos + 9..].trim();
620                if !default_value.is_empty() {
621                    comments.push(format!(
622                        "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
623                        options.table_name, name, default_value
624                    ));
625                }
626            }
627            // PostgreSQL 不使用 NOT NULL 约束,依赖默认值
628            // 如果现有字段有 NOT NULL 约束,移除它
629            let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
630                .as_str()
631                .unwrap_or("YES");
632            let old_is_required = old_is_nullable == "NO";
633
634            // 跳过主键字段,主键隐含 NOT NULL 约束
635            if old_is_required && name != options.table_key {
636                comments.push(format!(
637                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
638                    options.table_name, name
639                ));
640            }
641
642            if rows.len() > 1 {
643                comments.push(format!(
644                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
645                    options.table_name,
646                    name,
647                    rows[1].trim()
648                ));
649            }
650        }
651
652        let mut unique_new = vec![];
653        let mut index_new = vec![];
654        let mut primary_key = vec![];
655        let (_, index_list) = self.query(
656            format!(
657                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
658                options.table_name
659            )
660            .as_str(),
661        );
662        for item in index_list.members() {
663            let key_name = item["indexname"].as_str().unwrap_or("");
664            let indexdef = item["indexdef"].to_string();
665
666            if indexdef.contains(
667                format!(
668                    "CREATE UNIQUE INDEX {}_{} ON",
669                    options.table_name, options.table_key
670                )
671                .as_str(),
672            ) {
673                primary_key.push(key_name.to_string());
674                continue;
675            }
676            if indexdef.contains("CREATE UNIQUE INDEX") {
677                unique_new.push(key_name.to_string());
678                continue;
679            }
680            if indexdef.contains("CREATE INDEX") {
681                index_new.push(key_name.to_string());
682                continue;
683            }
684        }
685
686        if !options.table_unique.is_empty() {
687            let full_name = format!(
688                "{}_unique_{}",
689                options.table_name,
690                options.table_unique.join("_")
691            );
692            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
693            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
694            let unique = format!(
695                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
696                name,
697                options.table_name,
698                options.table_unique.join(",")
699            );
700            if !unique_new.contains(&name) {
701                comments.push(unique);
702            }
703            unique_new.retain(|x| *x != name);
704        }
705
706        for row in options.table_index.iter() {
707            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
708            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
709            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
710            let index = format!(
711                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
712                name,
713                options.table_name,
714                row.join(",")
715            );
716            if !index_new.contains(&name) {
717                comments.push(index);
718            }
719            index_new.retain(|x| *x != name);
720        }
721
722        for item in unique_new {
723            if item.ends_with("_pkey") {
724                continue;
725            }
726            if item.starts_with("unique_") {
727                comments.push(format!(
728                    "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
729                    options.table_name,
730                    item.clone()
731                ));
732            } else {
733                comments.push(format!("DROP INDEX {};\r\n", item.clone()));
734            }
735        }
736        for item in index_new {
737            if item.ends_with("_pkey") {
738                continue;
739            }
740            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
741        }
742
743        if self.params.sql {
744            return JsonValue::from(comments.join(""));
745        }
746
747        if comments.is_empty() {
748            return JsonValue::from(-1);
749        }
750
751        for item in comments.iter() {
752            let (state, res) = self.execute(item.as_str());
753            match state {
754                true => {}
755                false => {
756                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
757                    return JsonValue::from(0);
758                }
759            }
760        }
761        JsonValue::from(1)
762    }
763
764    fn table_info(&mut self, table: &str) -> JsonValue {
765        // 读缓存(与 MySQL 一致的 TABLE_FIELDS RwLock 缓存)
766        let cache_key = format!("{}{}", self.default, table);
767        let table_fields_guard = match TABLE_FIELDS.read() {
768            Ok(g) => g,
769            Err(e) => e.into_inner(),
770        };
771        if let Some(cached) = table_fields_guard.get(&cache_key) {
772            return cached.clone();
773        }
774        drop(table_fields_guard);
775        let sql = format!(
776            "SELECT  COL.COLUMN_NAME,
777    COL.DATA_TYPE,
778    COL.IS_NULLABLE,
779    COL.CHARACTER_MAXIMUM_LENGTH,
780    COL.NUMERIC_PRECISION,
781    COL.NUMERIC_SCALE,
782    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
783    LEFT JOIN
784    pg_catalog.pg_description DESCRIPTION
785    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
786    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
787        let (state, data) = self.query(sql.as_str());
788        let mut list = object! {};
789        if state {
790            for item in data.members() {
791                let mut row = object! {};
792                row["field"] = item["column_name"].clone();
793                row["comment"] = item["comment"].clone();
794                row["type"] = item["data_type"].clone();
795                row["is_nullable"] = item["is_nullable"].clone();
796                row["max_length"] = item["character_maximum_length"].clone();
797                row["numeric_precision"] = item["numeric_precision"].clone();
798                row["numeric_scale"] = item["numeric_scale"].clone();
799                if let Some(field_name) = row["field"].as_str() {
800                    list[field_name] = row.clone();
801                }
802            }
803            // 写入缓存
804            let mut table_fields_guard = match TABLE_FIELDS.write() {
805                Ok(g) => g,
806                Err(e) => e.into_inner(),
807            };
808            table_fields_guard.insert(cache_key, list.clone());
809            list
810        } else {
811            list
812        }
813    }
814
815    fn table_is_exist(&mut self, name: &str) -> bool {
816        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
817        let (state, data) = self.query(sql.as_str());
818        match state {
819            true => {
820                for item in data.members() {
821                    if item.has_key("exists") {
822                        return item["exists"].as_bool().unwrap_or(false);
823                    }
824                }
825                false
826            }
827            false => false,
828        }
829    }
830
831    fn table(&mut self, name: &str) -> &mut Pgsql {
832        self.params = Params::default(self.connection.mode.str().as_str());
833        let table_name = format!("{}{}", self.connection.prefix, name);
834        if !super::sql_safety::validate_table_name(&table_name) {
835            error!("Invalid table name: {}", name);
836        }
837        self.params.table = table_name.clone();
838        self.params.join_table = table_name;
839        self
840    }
841
842    fn change_table(&mut self, name: &str) -> &mut Self {
843        self.params.join_table = name.to_string();
844        self
845    }
846
847    fn autoinc(&mut self) -> &mut Self {
848        self.params.autoinc = true;
849        self
850    }
851
852    fn timestamps(&mut self) -> &mut Self {
853        self.params.timestamps = true;
854        self
855    }
856
857    fn fetch_sql(&mut self) -> &mut Self {
858        self.params.sql = true;
859        self
860    }
861
862    fn order(&mut self, field: &str, by: bool) -> &mut Self {
863        self.params.order[field] = {
864            if by {
865                "DESC"
866            } else {
867                "ASC"
868            }
869        }
870        .into();
871        self
872    }
873
874    fn group(&mut self, field: &str) -> &mut Self {
875        let fields: Vec<&str> = field.split(",").collect();
876        for field in fields.iter() {
877            let field = field.to_string();
878            self.params.group[field.as_str()] = field.clone().into();
879            self.params.fields[field.as_str()] = field.clone().into();
880        }
881        self
882    }
883
884    fn distinct(&mut self) -> &mut Self {
885        self.params.distinct = true;
886        self
887    }
888
889    fn json(&mut self, field: &str) -> &mut Self {
890        let list: Vec<&str> = field.split(",").collect();
891        for item in list.iter() {
892            self.params.json[item.to_string().as_str()] = item.to_string().into();
893        }
894        self
895    }
896
897    fn location(&mut self, field: &str) -> &mut Self {
898        let list: Vec<&str> = field.split(",").collect();
899        for item in list.iter() {
900            self.params.location[item.to_string().as_str()] = item.to_string().into();
901        }
902        self
903    }
904
905    fn field(&mut self, field: &str) -> &mut Self {
906        let list: Vec<&str> = field.split(",").collect();
907        let join_table = if self.params.join_table.is_empty() {
908            self.params.table.clone()
909        } else {
910            self.params.join_table.clone()
911        };
912        for item in list.iter() {
913            let lower = item.to_lowercase();
914            let is_expr = lower.contains("count(")
915                || lower.contains("sum(")
916                || lower.contains("avg(")
917                || lower.contains("max(")
918                || lower.contains("min(")
919                || lower.contains("case ");
920            if is_expr {
921                self.params.fields[item.to_string().as_str()] = (*item).into();
922            } else if item.contains(" as ") {
923                let text = item.split(" as ").collect::<Vec<&str>>();
924                self.params.fields[item.to_string().as_str()] =
925                    format!("{}.{} as {}", join_table, text[0], text[1]).into();
926            } else {
927                self.params.fields[item.to_string().as_str()] =
928                    format!("{join_table}.{item}").into();
929            }
930        }
931        self
932    }
933
934    fn field_raw(&mut self, expr: &str) -> &mut Self {
935        self.params.fields[expr] = expr.into();
936        self
937    }
938
939    fn hidden(&mut self, name: &str) -> &mut Self {
940        let hidden: Vec<&str> = name.split(",").collect();
941
942        let fields_list = self.table_info(self.params.clone().table.as_str());
943        let mut data = array![];
944        for item in fields_list.members() {
945            let _ = data.push(object! {
946                "name":item["field"].as_str().unwrap_or("")
947            });
948        }
949
950        for item in data.members() {
951            let name = item["name"].as_str().unwrap_or("");
952            if !hidden.contains(&name) {
953                self.params.fields[name] = name.into();
954            }
955        }
956        self
957    }
958
959    fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
960        for f in field.split('|') {
961            if !super::sql_safety::validate_field_name(f) {
962                error!("Invalid field name: {}", f);
963            }
964        }
965        if !super::sql_safety::validate_compare_orator(compare) {
966            error!("Invalid compare operator: {}", compare);
967        }
968        let join_table = if self.params.join_table.is_empty() {
969            self.params.table.clone()
970        } else {
971            self.params.join_table.clone()
972        };
973        if value.is_boolean() {
974            let bool_val = value.as_bool().unwrap_or(false);
975            self.params
976                .where_and
977                .push(format!("{join_table}.{field} {compare} {bool_val}"));
978            return self;
979        }
980        match compare {
981            "between" => {
982                self.params.where_and.push(format!(
983                    "{}.{} between '{}' AND '{}'",
984                    join_table, field, value[0], value[1]
985                ));
986            }
987            "set" => {
988                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
989                let mut wheredata = vec![];
990                for item in list.iter() {
991                    wheredata.push(format!(
992                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
993                    ));
994                }
995                self.params
996                    .where_and
997                    .push(format!("({})", wheredata.join(" or ")));
998            }
999            "notin" => {
1000                let mut text = String::new();
1001                for item in value.members() {
1002                    text = format!("{text},'{item}'");
1003                }
1004                text = text.trim_start_matches(",").into();
1005                self.params
1006                    .where_and
1007                    .push(format!("{join_table}.{field} not in ({text})"));
1008            }
1009            "is" => {
1010                self.params
1011                    .where_and
1012                    .push(format!("{join_table}.{field} is {value}"));
1013            }
1014            "isnot" => {
1015                self.params
1016                    .where_and
1017                    .push(format!("{join_table}.{field} is not {value}"));
1018            }
1019            "notlike" => {
1020                self.params
1021                    .where_and
1022                    .push(format!("{join_table}.{field} not like '{value}'"));
1023            }
1024            "in" => {
1025                if value.is_array() && value.is_empty() {
1026                    self.params.where_and.push("1=0".to_string());
1027                    return self;
1028                }
1029                let mut text = String::new();
1030                if value.is_array() {
1031                    for item in value.members() {
1032                        text = format!("{text},'{item}'");
1033                    }
1034                } else if value.is_null() {
1035                    text = format!("{text},null");
1036                } else {
1037                    let value = value.as_str().unwrap_or("");
1038
1039                    let value: Vec<&str> = value.split(",").collect();
1040                    for item in value.iter() {
1041                        text = format!("{text},'{item}'");
1042                    }
1043                }
1044                text = text.trim_start_matches(",").into();
1045
1046                self.params
1047                    .where_and
1048                    .push(format!("{join_table}.{field} {compare} ({text})"));
1049            }
1050            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
1051            // 用法:.where_and("tags", "json_contains", "紧急".into())
1052            //       .where_and("tags", "json_contains", json::array!["紧急", "重要"])
1053            "json_contains" => {
1054                if value.is_array() {
1055                    if value.is_empty() {
1056                        self.params.where_and.push("1=0".to_string());
1057                    } else {
1058                        let mut parts = vec![];
1059                        for item in value.members() {
1060                            let escaped = super::sql_safety::escape_string(&item.to_string());
1061                            parts.push(format!(
1062                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1063                                escaped
1064                            ));
1065                        }
1066                        self.params
1067                            .where_and
1068                            .push(format!("({})", parts.join(" OR ")));
1069                    }
1070                } else {
1071                    let escaped = super::sql_safety::escape_string(&value.to_string());
1072                    self.params.where_and.push(format!(
1073                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1074                        escaped
1075                    ));
1076                }
1077            }
1078            _ => {
1079                self.params
1080                    .where_and
1081                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1082            }
1083        }
1084        self
1085    }
1086
1087    fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1088        for f in field.split('|') {
1089            if !super::sql_safety::validate_field_name(f) {
1090                error!("Invalid field name: {}", f);
1091            }
1092        }
1093        if !super::sql_safety::validate_compare_orator(compare) {
1094            error!("Invalid compare operator: {}", compare);
1095        }
1096        let join_table = if self.params.join_table.is_empty() {
1097            self.params.table.clone()
1098        } else {
1099            self.params.join_table.clone()
1100        };
1101
1102        if value.is_boolean() {
1103            let bool_val = value.as_bool().unwrap_or(false);
1104            self.params
1105                .where_or
1106                .push(format!("{join_table}.{field} {compare} {bool_val}"));
1107            return self;
1108        }
1109
1110        match compare {
1111            "between" => {
1112                self.params.where_or.push(format!(
1113                    "{}.{} between '{}' AND '{}'",
1114                    join_table, field, value[0], value[1]
1115                ));
1116            }
1117            "set" => {
1118                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1119                let mut wheredata = vec![];
1120                for item in list.iter() {
1121                    wheredata.push(format!(
1122                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1123                    ));
1124                }
1125                self.params
1126                    .where_or
1127                    .push(format!("({})", wheredata.join(" or ")));
1128            }
1129            "notin" => {
1130                let mut text = String::new();
1131                for item in value.members() {
1132                    text = format!("{text},'{item}'");
1133                }
1134                text = text.trim_start_matches(",").into();
1135                self.params
1136                    .where_or
1137                    .push(format!("{join_table}.{field} not in ({text})"));
1138            }
1139            "is" => {
1140                self.params
1141                    .where_or
1142                    .push(format!("{join_table}.{field} is {value}"));
1143            }
1144            "isnot" => {
1145                self.params
1146                    .where_or
1147                    .push(format!("{join_table}.{field} is not {value}"));
1148            }
1149            "in" => {
1150                if value.is_array() && value.is_empty() {
1151                    self.params.where_or.push("1=0".to_string());
1152                    return self;
1153                }
1154                let mut text = String::new();
1155                if value.is_array() {
1156                    for item in value.members() {
1157                        text = format!("{text},'{item}'");
1158                    }
1159                } else {
1160                    let value = value.as_str().unwrap_or("");
1161                    let value: Vec<&str> = value.split(",").collect();
1162                    for item in value.iter() {
1163                        text = format!("{text},'{item}'");
1164                    }
1165                }
1166                text = text.trim_start_matches(",").into();
1167                self.params
1168                    .where_or
1169                    .push(format!("{join_table}.{field} {compare} ({text})"));
1170            }
1171            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
1172            // 用法:.where_or("tags", "json_contains", "紧急".into())
1173            //       .where_or("tags", "json_contains", json::array!["紧急", "重要"])
1174            "json_contains" => {
1175                if value.is_array() {
1176                    if value.is_empty() {
1177                        self.params.where_or.push("1=0".to_string());
1178                    } else {
1179                        let mut parts = vec![];
1180                        for item in value.members() {
1181                            let escaped = super::sql_safety::escape_string(&item.to_string());
1182                            parts.push(format!(
1183                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1184                                escaped
1185                            ));
1186                        }
1187                        self.params
1188                            .where_or
1189                            .push(format!("({})", parts.join(" OR ")));
1190                    }
1191                } else {
1192                    let escaped = super::sql_safety::escape_string(&value.to_string());
1193                    self.params.where_or.push(format!(
1194                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1195                        escaped
1196                    ));
1197                }
1198            }
1199            _ => {
1200                self.params
1201                    .where_or
1202                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1203            }
1204        }
1205        self
1206    }
1207
1208    fn where_raw(&mut self, expr: &str) -> &mut Self {
1209        self.params.where_and.push(expr.to_string());
1210        self
1211    }
1212
1213    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1214        self.params
1215            .where_and
1216            .push(format!("\"{field}\" IN ({sub_sql})"));
1217        self
1218    }
1219
1220    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1221        self.params
1222            .where_and
1223            .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1224        self
1225    }
1226
1227    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1228        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1229        self
1230    }
1231
1232    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1233        self.params
1234            .where_and
1235            .push(format!("NOT EXISTS ({sub_sql})"));
1236        self
1237    }
1238
1239    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1240        self.params.where_column = format!(
1241            "{}.{} {} {}.{}",
1242            self.params.table, field_a, compare, self.params.table, field_b
1243        );
1244        self
1245    }
1246
1247    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1248        self.params
1249            .update_column
1250            .push(format!("{field_a} = {compare}"));
1251        self
1252    }
1253
1254    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1255        self.params.page = page;
1256        self.params.limit = limit;
1257        self
1258    }
1259
1260    fn limit(&mut self, count: i32) -> &mut Self {
1261        self.params.limit_only = count;
1262        self
1263    }
1264
1265    fn column(&mut self, field: &str) -> JsonValue {
1266        self.field(field);
1267        let sql = self.params.select_sql();
1268
1269        if self.params.sql {
1270            return JsonValue::from(sql);
1271        }
1272        let (state, data) = self.query(sql.as_str());
1273        match state {
1274            true => {
1275                let mut list = array![];
1276                for item in data.members() {
1277                    if self.params.json[field].is_empty() {
1278                        let _ = list.push(item[field].clone());
1279                    } else {
1280                        let data =
1281                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1282                        let _ = list.push(data);
1283                    }
1284                }
1285                list
1286            }
1287            false => {
1288                array![]
1289            }
1290        }
1291    }
1292
1293    fn count(&mut self) -> JsonValue {
1294        self.params.fields = json::object! {};
1295        self.params.fields["count"] = "count(*) as count".to_string().into();
1296        let sql = self.params.select_sql();
1297        if self.params.sql {
1298            return JsonValue::from(sql.clone());
1299        }
1300        let (state, data) = self.query(sql.as_str());
1301        if state {
1302            data[0]["count"].clone()
1303        } else {
1304            JsonValue::from(0)
1305        }
1306    }
1307
1308    fn max(&mut self, field: &str) -> JsonValue {
1309        self.params.fields[field] = format!("max({field}) as {field}").into();
1310        let sql = self.params.select_sql();
1311        if self.params.sql {
1312            return JsonValue::from(sql.clone());
1313        }
1314        let (state, data) = self.query(sql.as_str());
1315        if state {
1316            if data.len() > 1 {
1317                return data.clone();
1318            }
1319            data[0][field].clone()
1320        } else {
1321            JsonValue::from(0)
1322        }
1323    }
1324
1325    fn min(&mut self, field: &str) -> JsonValue {
1326        self.params.fields[field] = format!("min({field}) as {field}").into();
1327        let sql = self.params.select_sql();
1328        if self.params.sql {
1329            return JsonValue::from(sql.clone());
1330        }
1331        let (state, data) = self.query(sql.as_str());
1332        if state {
1333            if data.len() > 1 {
1334                return data;
1335            }
1336            data[0][field].clone()
1337        } else {
1338            JsonValue::from(0)
1339        }
1340    }
1341
1342    fn sum(&mut self, field: &str) -> JsonValue {
1343        self.params.fields[field] = format!("sum({field}) as {field}").into();
1344        let sql = self.params.select_sql();
1345        if self.params.sql {
1346            return JsonValue::from(sql.clone());
1347        }
1348        let (state, data) = self.query(sql.as_str());
1349        match state {
1350            true => {
1351                if data.len() > 1 {
1352                    return data;
1353                }
1354                data[0][field].clone()
1355            }
1356            false => JsonValue::from(0),
1357        }
1358    }
1359
1360    fn avg(&mut self, field: &str) -> JsonValue {
1361        self.params.fields[field] = format!("avg({field}) as {field}").into();
1362        let sql = self.params.select_sql();
1363        if self.params.sql {
1364            return JsonValue::from(sql.clone());
1365        }
1366        let (state, data) = self.query(sql.as_str());
1367        if state {
1368            if data.len() > 1 {
1369                return data;
1370            }
1371            data[0][field].clone()
1372        } else {
1373            JsonValue::from(0)
1374        }
1375    }
1376
1377    fn having(&mut self, expr: &str) -> &mut Self {
1378        self.params.having.push(expr.to_string());
1379        self
1380    }
1381
1382    fn select(&mut self) -> JsonValue {
1383        let sql = self.params.select_sql();
1384        if self.params.sql {
1385            return JsonValue::from(sql.clone());
1386        }
1387        let (state, mut data) = self.query(sql.as_str());
1388        match state {
1389            true => {
1390                for (field, _) in self.params.json.entries() {
1391                    for item in data.members_mut() {
1392                        if !item[field].is_empty() {
1393                            let json = item[field].to_string();
1394                            item[field] = match json::parse(&json) {
1395                                Ok(e) => e,
1396                                Err(_) => JsonValue::from(json),
1397                            };
1398                        }
1399                    }
1400                }
1401                data.clone()
1402            }
1403            false => array![],
1404        }
1405    }
1406
1407    fn find(&mut self) -> JsonValue {
1408        self.params.page = 1;
1409        self.params.limit = 1;
1410        let sql = self.params.select_sql();
1411        if self.params.sql {
1412            return JsonValue::from(sql.clone());
1413        }
1414        let (state, mut data) = self.query(sql.as_str());
1415        match state {
1416            true => {
1417                if data.is_empty() {
1418                    return object! {};
1419                }
1420                for (field, _) in self.params.json.entries() {
1421                    if !data[0][field].is_empty() {
1422                        let json = data[0][field].to_string();
1423                        let json = json::parse(&json).unwrap_or(array![]);
1424                        data[0][field] = json;
1425                    } else {
1426                        data[0][field] = array![];
1427                    }
1428                }
1429                data[0].clone()
1430            }
1431            false => {
1432                object! {}
1433            }
1434        }
1435    }
1436
1437    fn value(&mut self, field: &str) -> JsonValue {
1438        self.params.fields = object! {};
1439        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1440        self.params.page = 1;
1441        self.params.limit = 1;
1442        let sql = self.params.select_sql();
1443        if self.params.sql {
1444            return JsonValue::from(sql.clone());
1445        }
1446        let (state, mut data) = self.query(sql.as_str());
1447        match state {
1448            true => {
1449                for (field, _) in self.params.json.entries() {
1450                    if !data[0][field].is_empty() {
1451                        let json = data[0][field].to_string();
1452                        let json = json::parse(&json).unwrap_or(array![]);
1453                        data[0][field] = json;
1454                    } else {
1455                        data[0][field] = array![];
1456                    }
1457                }
1458                data[0][field].clone()
1459            }
1460            false => {
1461                if self.connection.debug {
1462                    info!("{data:?}");
1463                }
1464                JsonValue::Null
1465            }
1466        }
1467    }
1468
1469    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1470        let fields_list = self.table_info(&self.params.table.clone());
1471        let mut fields = vec![];
1472        let mut values = vec![];
1473        if !self.params.autoinc && data["id"].is_empty() {
1474            let thread_id = format!("{:?}", std::thread::current().id());
1475            let thread_num: u64 = thread_id
1476                .trim_start_matches("ThreadId(")
1477                .trim_end_matches(")")
1478                .parse()
1479                .unwrap_or(0);
1480            data["id"] = format!(
1481                "{:X}{:X}",
1482                Local::now().timestamp_nanos_opt().unwrap_or(0),
1483                thread_num
1484            )
1485            .into();
1486        }
1487        for (field, value) in data.entries() {
1488            fields.push(format!("\"{}\"", field));
1489
1490            if value.is_string() {
1491                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1492                continue;
1493            } else if value.is_array() {
1494                if self.params.json[field].is_empty() {
1495                    let array = value
1496                        .members()
1497                        .map(|x| x.as_str().unwrap_or(""))
1498                        .collect::<Vec<&str>>()
1499                        .join(",");
1500                    values.push(format!("'{}'", array.replace("'", "''")));
1501                } else {
1502                    let json = value.to_string();
1503                    let json = json.replace("'", "''");
1504                    values.push(format!("'{json}'"));
1505                }
1506                continue;
1507            } else if value.is_object() {
1508                if self.params.json[field].is_empty() {
1509                    values.push(format!("'{}'", value.to_string().replace("'", "''")));
1510                } else {
1511                    let json = value.to_string();
1512                    let json = json.replace("'", "''");
1513                    values.push(format!("'{json}'"));
1514                }
1515                continue;
1516            } else if value.is_number() {
1517                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1518                if col_type == "boolean" {
1519                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1520                    values.push(format!("{bool_val}"));
1521                } else if col_type.contains("int") {
1522                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1523                } else {
1524                    values.push(format!("{value}"));
1525                }
1526                continue;
1527            } else if value.is_boolean() || value.is_null() {
1528                values.push(format!("{value}"));
1529                continue;
1530            } else {
1531                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1532                continue;
1533            }
1534        }
1535        let fields = fields.join(",");
1536        let values = values.join(",");
1537
1538        let sql = format!(
1539            "INSERT INTO {} ({}) VALUES ({});",
1540            self.params.table, fields, values
1541        );
1542        if self.params.sql {
1543            return JsonValue::from(sql.clone());
1544        }
1545        let (state, ids) = self.execute(sql.as_str());
1546
1547        match state {
1548            true => match self.params.autoinc {
1549                true => ids.clone(),
1550                false => data["id"].clone(),
1551            },
1552            false => {
1553                let thread_id = format!("{:?}", thread::current().id());
1554                error!("添加失败: {thread_id} {ids:?} {sql}");
1555                JsonValue::from("")
1556            }
1557        }
1558    }
1559    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1560        let fields_list = self.table_info(&self.params.table.clone());
1561        let mut fields = String::new();
1562        if !self.params.autoinc && data[0]["id"].is_empty() {
1563            data[0]["id"] = "".into();
1564        }
1565        for (field, _) in data[0].entries() {
1566            fields = format!("{fields},\"{field}\"");
1567        }
1568        fields = fields.trim_start_matches(",").to_string();
1569
1570        let core_count = num_cpus::get();
1571        let mut p = pools::Pool::new(core_count * 4);
1572
1573        let autoinc = self.params.autoinc;
1574        for list in data.members() {
1575            let mut item = list.clone();
1576            let i = br_fields::str::Code::verification_code(3);
1577            let fields_list_new = fields_list.clone();
1578            p.execute(move |pcindex| {
1579                if !autoinc && item["id"].is_empty() {
1580                    let id = format!(
1581                        "{:X}{:X}{}",
1582                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1583                        pcindex,
1584                        i
1585                    );
1586                    item["id"] = id.into();
1587                }
1588                let mut values = "".to_string();
1589                for (field, value) in item.entries() {
1590                    if value.is_string() {
1591                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1592                    } else if value.is_number() {
1593                        let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1594                        if col_type == "boolean" {
1595                            let bool_val = value.as_i64().unwrap_or(0) != 0;
1596                            values = format!("{values},{bool_val}");
1597                        } else if col_type.contains("int") {
1598                            values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1599                        } else {
1600                            values = format!("{values},{value}");
1601                        }
1602                    } else if value.is_boolean() {
1603                        values = format!("{values},{value}");
1604                        continue;
1605                    } else {
1606                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1607                    }
1608                }
1609                values = format!("({})", values.trim_start_matches(","));
1610                array![item["id"].clone(), values]
1611            });
1612        }
1613        let (ids_list, mut values) = p.insert_all();
1614        values = values.trim_start_matches(",").to_string();
1615        let sql = format!(
1616            "INSERT INTO {} ({}) VALUES {};",
1617            self.params.table, fields, values
1618        );
1619
1620        if self.params.sql {
1621            return JsonValue::from(sql.clone());
1622        }
1623        let (state, data) = self.execute(sql.as_str());
1624        match state {
1625            true => match autoinc {
1626                true => data,
1627                false => JsonValue::from(ids_list),
1628            },
1629            false => {
1630                error!("insert_all: {data:?}");
1631
1632                array![]
1633            }
1634        }
1635    }
1636    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1637        let fields_list = self.table_info(&self.params.table.clone());
1638        let mut fields = vec![];
1639        let mut values = vec![];
1640        if !self.params.autoinc && data["id"].is_empty() {
1641            let thread_id = format!("{:?}", std::thread::current().id());
1642            let thread_num: u64 = thread_id
1643                .trim_start_matches("ThreadId(")
1644                .trim_end_matches(")")
1645                .parse()
1646                .unwrap_or(0);
1647            data["id"] = format!(
1648                "{:X}{:X}",
1649                Local::now().timestamp_nanos_opt().unwrap_or(0),
1650                thread_num
1651            )
1652            .into();
1653        }
1654        for (field, value) in data.entries() {
1655            fields.push(format!("\"{}\"", field));
1656
1657            if value.is_string() {
1658                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1659                continue;
1660            } else if value.is_array() {
1661                if self.params.json[field].is_empty() {
1662                    let array = value
1663                        .members()
1664                        .map(|x| x.as_str().unwrap_or(""))
1665                        .collect::<Vec<&str>>()
1666                        .join(",");
1667                    values.push(format!("'{}'", array.replace("'", "''")));
1668                } else {
1669                    let json = value.to_string();
1670                    let json = json.replace("'", "''");
1671                    values.push(format!("'{json}'"));
1672                }
1673                continue;
1674            } else if value.is_object() {
1675                if self.params.json[field].is_empty() {
1676                    values.push(format!("'{}'", value.to_string().replace("'", "''")));
1677                } else {
1678                    let json = value.to_string();
1679                    let json = json.replace("'", "''");
1680                    values.push(format!("'{json}'"));
1681                }
1682                continue;
1683            } else if value.is_number() {
1684                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1685                if col_type == "boolean" {
1686                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1687                    values.push(format!("{bool_val}"));
1688                } else if col_type.contains("int") {
1689                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1690                } else {
1691                    values.push(format!("{value}"));
1692                }
1693                continue;
1694            } else if value.is_boolean() || value.is_null() {
1695                values.push(format!("{value}"));
1696                continue;
1697            } else {
1698                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1699                continue;
1700            }
1701        }
1702
1703        let conflict_cols: Vec<String> = conflict_fields
1704            .iter()
1705            .map(|f| format!("\"{}\"", f))
1706            .collect();
1707
1708        let update_set: Vec<String> = fields
1709            .iter()
1710            .filter(|f| {
1711                let name = f.trim_matches('"');
1712                !conflict_fields.contains(&name) && name != "id"
1713            })
1714            .map(|f| format!("{f}=EXCLUDED.{f}"))
1715            .collect();
1716
1717        let fields_str = fields.join(",");
1718        let values_str = values.join(",");
1719
1720        let sql = format!(
1721            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1722            self.params.table,
1723            fields_str,
1724            values_str,
1725            conflict_cols.join(","),
1726            update_set.join(",")
1727        );
1728        if self.params.sql {
1729            return JsonValue::from(sql.clone());
1730        }
1731        let (state, result) = self.execute(sql.as_str());
1732        match state {
1733            true => match self.params.autoinc {
1734                true => result.clone(),
1735                false => data["id"].clone(),
1736            },
1737            false => {
1738                let thread_id = format!("{:?}", thread::current().id());
1739                error!("upsert失败: {thread_id} {result:?} {sql}");
1740                JsonValue::from("")
1741            }
1742        }
1743    }
1744    fn update(&mut self, data: JsonValue) -> JsonValue {
1745        let fields_list = self.table_info(&self.params.table.clone());
1746        let mut values = vec![];
1747        for (field, value) in data.entries() {
1748            if value.is_string() {
1749                values.push(format!(
1750                    "\"{}\"='{}'",
1751                    field,
1752                    value.to_string().replace("'", "''")
1753                ));
1754            } else if value.is_number() {
1755                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1756                if col_type == "boolean" {
1757                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1758                    values.push(format!("\"{field}\"= {bool_val}"));
1759                } else if col_type.contains("int") {
1760                    values.push(format!(
1761                        "\"{}\"= {}",
1762                        field,
1763                        value.as_f64().unwrap_or(0.0) as i64
1764                    ));
1765                } else {
1766                    values.push(format!("\"{field}\"= {value}"));
1767                }
1768            } else if value.is_array() {
1769                if self.params.json[field].is_empty() {
1770                    let array = value
1771                        .members()
1772                        .map(|x| x.as_str().unwrap_or(""))
1773                        .collect::<Vec<&str>>()
1774                        .join(",");
1775                    values.push(format!("\"{}\"='{}'", field, array.replace("'", "''")));
1776                } else {
1777                    let json = value.to_string();
1778                    let json = json.replace("'", "''");
1779                    values.push(format!("\"{field}\"='{json}'"));
1780                }
1781                continue;
1782            } else if value.is_object() {
1783                if self.params.json[field].is_empty() {
1784                    values.push(format!(
1785                        "\"{}\"='{}'",
1786                        field,
1787                        value.to_string().replace("'", "''")
1788                    ));
1789                } else {
1790                    if value.is_empty() {
1791                        values.push(format!("\"{field}\"=''"));
1792                        continue;
1793                    }
1794                    let json = value.to_string();
1795                    let json = json.replace("'", "''");
1796                    values.push(format!("\"{field}\"='{json}'"));
1797                }
1798                continue;
1799            } else if value.is_boolean() || value.is_null() {
1800                values.push(format!("\"{field}\"= {value}"));
1801            } else {
1802                values.push(format!("\"{field}\"=\"{value}\""));
1803            }
1804        }
1805
1806        for (field, value) in self.params.inc_dec.entries() {
1807            values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1808        }
1809        if !self.params.update_column.is_empty() {
1810            values.extend(self.params.update_column.clone());
1811        }
1812        let values = values.join(",");
1813
1814        let sql = format!(
1815            "UPDATE {} SET {} {};",
1816            self.params.table.clone(),
1817            values,
1818            self.params.where_sql()
1819        );
1820        if self.params.sql {
1821            return JsonValue::from(sql.clone());
1822        }
1823        let (state, data) = self.execute(sql.as_str());
1824        if state {
1825            data
1826        } else {
1827            let thread_id = format!("{:?}", thread::current().id());
1828            error!("update: {thread_id} {data:?} {sql}");
1829            0.into()
1830        }
1831    }
1832    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1833        let fields_list = self.table_info(&self.params.table.clone());
1834        let mut values = vec![];
1835
1836        let mut ids = vec![];
1837        for (field, _) in data[0].entries() {
1838            if field == "id" {
1839                continue;
1840            }
1841            let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1842            let mut fields = vec![];
1843            for row in data.members() {
1844                let value = row[field].clone();
1845                let id = row["id"].clone();
1846                ids.push(id.clone());
1847                if value.is_string() {
1848                    fields.push(format!(
1849                        "WHEN '{}' THEN '{}'",
1850                        id,
1851                        value.to_string().replace("'", "''")
1852                    ));
1853                } else if value.is_array() || value.is_object() {
1854                    if self.params.json[field].is_empty() {
1855                        fields.push(format!(
1856                            "WHEN '{}' THEN '{}'",
1857                            id,
1858                            value.to_string().replace("'", "''")
1859                        ));
1860                    } else {
1861                        let json = value.to_string();
1862                        let json = json.replace("'", "''");
1863                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1864                    }
1865                    continue;
1866                } else if value.is_number() {
1867                    if col_type == "boolean" {
1868                        let bool_val = value.as_i64().unwrap_or(0) != 0;
1869                        fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1870                    } else {
1871                        fields.push(format!("WHEN '{id}' THEN {value}"));
1872                    }
1873                } else if value.is_boolean() || value.is_null() {
1874                    fields.push(format!("WHEN '{id}' THEN {value}"));
1875                } else {
1876                    fields.push(format!(
1877                        "WHEN '{}' THEN '{}'",
1878                        id,
1879                        value.to_string().replace("'", "''")
1880                    ));
1881                }
1882            }
1883            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1884        }
1885        self.where_and("id", "in", ids.into());
1886        for (field, value) in self.params.inc_dec.entries() {
1887            values.push(format!("{} = {}", field, value.to_string().clone()));
1888        }
1889
1890        let values = values.join(",");
1891        let sql = format!(
1892            "UPDATE {} SET {} {} {};",
1893            self.params.table.clone(),
1894            values,
1895            self.params.where_sql(),
1896            self.params.page_limit_sql()
1897        );
1898        if self.params.sql {
1899            return JsonValue::from(sql.clone());
1900        }
1901        let (state, data) = self.execute(sql.as_str());
1902        if state {
1903            data
1904        } else {
1905            error!("update_all: {data:?}");
1906            JsonValue::from(0)
1907        }
1908    }
1909
1910    fn delete(&mut self) -> JsonValue {
1911        let sql = format!(
1912            "delete FROM {} {} {};",
1913            self.params.table.clone(),
1914            self.params.where_sql(),
1915            self.params.page_limit_sql()
1916        );
1917        if self.params.sql {
1918            return JsonValue::from(sql.clone());
1919        }
1920        let (state, data) = self.execute(sql.as_str());
1921        match state {
1922            true => data,
1923            false => {
1924                error!("delete 失败>>> {data:?}");
1925                JsonValue::from(0)
1926            }
1927        }
1928    }
1929
1930    fn transaction(&mut self) -> bool {
1931        let thread_id = format!("{:?}", thread::current().id());
1932        let key = format!("{}{}", self.default, thread_id);
1933
1934        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1935            let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1936            PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1937            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1938            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1939            return true;
1940        }
1941
1942        // 获取事务连接,失败时重试2次(短退避)
1943        let mut conn = None;
1944        for attempt in 0..3u8 {
1945            match self.client.get_connect_for_transaction() {
1946                Ok(mut c) => {
1947                    if c.is_valid() {
1948                        conn = Some(c);
1949                        break;
1950                    }
1951                    warn!("事务连接无效(第{}次)", attempt + 1);
1952                    self.client.release_transaction_conn();
1953                }
1954                Err(e) => {
1955                    warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1956                }
1957            }
1958            if attempt < 2 {
1959                thread::sleep(std::time::Duration::from_millis(200));
1960            }
1961        }
1962        let mut conn = match conn {
1963            Some(c) => c,
1964            None => {
1965                error!("获取事务连接重试耗尽");
1966                return false;
1967            }
1968        };
1969
1970        if let Err(e) = conn.execute("START TRANSACTION") {
1971            error!("启动事务失败: {e}");
1972            self.client.release_transaction_conn();
1973            return false;
1974        }
1975
1976        PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1977        true
1978    }
1979    fn commit(&mut self) -> bool {
1980        let thread_id = format!("{:?}", thread::current().id());
1981        let key = format!("{}{}", self.default, thread_id);
1982
1983        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1984            error!("commit: 没有活跃的事务");
1985            return false;
1986        }
1987
1988        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1989        if depth > 1 {
1990            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1991            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1992            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &key);
1993            return true;
1994        }
1995
1996        let commit_result =
1997            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1998
1999        let success = match commit_result {
2000            Some(Ok(_)) => true,
2001            Some(Err(e)) => {
2002                error!("提交事务失败: {e}");
2003                false
2004            }
2005            None => {
2006                error!("提交事务失败: 未找到连接");
2007                false
2008            }
2009        };
2010
2011        if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &key) {
2012            self.client.release_transaction_conn_with_conn(conn);
2013        } else {
2014            self.client.release_transaction_conn();
2015        }
2016        success
2017    }
2018
2019    fn rollback(&mut self) -> bool {
2020        let thread_id = format!("{:?}", thread::current().id());
2021        let key = format!("{}{}", self.default, thread_id);
2022
2023        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
2024            error!("rollback: 没有活跃的事务");
2025            return false;
2026        }
2027
2028        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
2029        if depth > 1 {
2030            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
2031            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
2032            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &key);
2033            return true;
2034        }
2035
2036        let rollback_result =
2037            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
2038
2039        let success = match rollback_result {
2040            Some(Ok(_)) => true,
2041            Some(Err(e)) => {
2042                error!("回滚失败: {e}");
2043                false
2044            }
2045            None => {
2046                error!("回滚失败: 未找到连接");
2047                false
2048            }
2049        };
2050
2051        if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &key) {
2052            self.client.release_transaction_conn_with_conn(conn);
2053        } else {
2054            self.client.release_transaction_conn();
2055        }
2056        success
2057    }
2058
2059    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
2060        let (state, data) = self.query(sql);
2061        match state {
2062            true => Ok(data),
2063            false => Err(data.to_string()),
2064        }
2065    }
2066
2067    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2068        self.params = Params::default(self.connection.mode.str().as_str());
2069        let (state, data) = self.execute(sql);
2070        match state {
2071            true => Ok(data),
2072            false => Err(data.to_string()),
2073        }
2074    }
2075
2076    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2077        self.params.inc_dec[field] = format!("{field} + {num}").into();
2078        self
2079    }
2080
2081    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2082        self.params.inc_dec[field] = format!("{field} - {num}").into();
2083        self
2084    }
2085    fn buildsql(&mut self) -> String {
2086        self.fetch_sql();
2087        let sql = self.select().to_string();
2088        format!("( {} ) {}", sql, self.params.table)
2089    }
2090
2091    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2092        for field in fields {
2093            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2094        }
2095        self
2096    }
2097
2098    fn join(
2099        &mut self,
2100        main_table: &str,
2101        main_fields: &str,
2102        right_table: &str,
2103        right_fields: &str,
2104    ) -> &mut Self {
2105        let main_table = if main_table.is_empty() {
2106            self.params.table.clone()
2107        } else {
2108            main_table.to_string()
2109        };
2110        self.params.join_table = right_table.to_string();
2111        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2112        self
2113    }
2114
2115    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2116        let main_fields = if main_fields.is_empty() {
2117            "id"
2118        } else {
2119            main_fields
2120        };
2121        let second_fields = if second_fields.is_empty() {
2122            self.params.table.clone()
2123        } else {
2124            second_fields.to_string().clone()
2125        };
2126        let sec_table_name = format!("{}{}", table, "_2");
2127        let second_table = format!("{} {}", table, sec_table_name.clone());
2128        self.params.join_table = sec_table_name.clone();
2129        self.params.join.push(format!(
2130            " INNER JOIN {} ON {}.{} = {}.{}",
2131            second_table, self.params.table, main_fields, sec_table_name, second_fields
2132        ));
2133        self
2134    }
2135
2136    fn join_right(
2137        &mut self,
2138        main_table: &str,
2139        main_fields: &str,
2140        right_table: &str,
2141        right_fields: &str,
2142    ) -> &mut Self {
2143        let main_table = if main_table.is_empty() {
2144            self.params.table.clone()
2145        } else {
2146            main_table.to_string()
2147        };
2148        self.params.join_table = right_table.to_string();
2149        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2150        self
2151    }
2152
2153    fn join_full(
2154        &mut self,
2155        main_table: &str,
2156        main_fields: &str,
2157        right_table: &str,
2158        right_fields: &str,
2159    ) -> &mut Self {
2160        let main_table = if main_table.is_empty() {
2161            self.params.table.clone()
2162        } else {
2163            main_table.to_string()
2164        };
2165        self.params.join_table = right_table.to_string();
2166        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2167        self
2168    }
2169
2170    fn union(&mut self, sub_sql: &str) -> &mut Self {
2171        self.params.unions.push(format!("UNION {sub_sql}"));
2172        self
2173    }
2174
2175    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2176        self.params.unions.push(format!("UNION ALL {sub_sql}"));
2177        self
2178    }
2179
2180    fn lock_for_update(&mut self) -> &mut Self {
2181        self.params.lock_mode = "FOR UPDATE".to_string();
2182        self
2183    }
2184
2185    fn lock_for_share(&mut self) -> &mut Self {
2186        self.params.lock_mode = "FOR SHARE".to_string();
2187        self
2188    }
2189}