br_db/types/
pgsql.rs

1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use json::{array, object, JsonValue};
5use lazy_static::lazy_static;
6use log::{error, info, warn};
7use std::sync::Mutex;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use br_pgsql::connect::Connect;
12use br_pgsql::pools::Pools;
13use br_pgsql::{PoolConstraints, PoolOpts};
14
15/// PostgreSQL 标识符最大长度(NAMEDATALEN - 1)
16const PG_MAX_IDENTIFIER_LENGTH: usize = 63;
17
18/// 生成 PostgreSQL 索引名称
19/// 如果名称超过 63 个字符,使用 MD5 哈希缩短
20fn generate_index_name(base_name: &str) -> String {
21    if base_name.len() <= PG_MAX_IDENTIFIER_LENGTH {
22        base_name.to_string()
23    } else {
24        // 使用 MD5 哈希生成固定长度的名称
25        let md5 = br_crypto::md5::encrypt_hex(base_name.as_bytes());
26        // 保留表名前缀的一部分,然后加上哈希值
27        // 格式:{table}_hash_{hash},确保总长度不超过 63 个字符
28        if let Some(underscore_pos) = base_name.find('_') {
29            // 计算可用长度:63 - "_hash_" (6) - MD5 前 16 个字符 (16) = 最多 41 个字符用于前缀
30            let max_prefix_len = PG_MAX_IDENTIFIER_LENGTH - 6 - 16;
31            let prefix = &base_name[..underscore_pos.min(max_prefix_len)];
32            format!("{}_hash_{}", prefix, &md5[..16])
33        } else {
34            // 如果找不到下划线,直接使用哈希值
35            format!("idx_{}", &md5[..(PG_MAX_IDENTIFIER_LENGTH - 4)])
36        }
37    }
38}
39
40
41lazy_static! {
42   static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
43   static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
44   static ref TABLE_FIELDS: Arc<Mutex<HashMap<String, JsonValue>>> = Arc::new(Mutex::new(HashMap::new()));
45}
46#[derive(Clone)]
47pub struct Pgsql {
48    /// 当前连接配置
49    pub connection: Connection,
50    /// 当前选中配置
51    pub default: String,
52    pub params: Params,
53    pub client: Pools,
54}
55
56impl Pgsql {
57    /// 连接数据库(参考 mysql 的实现方式)
58    /// 使用 PoolConstraints 和 PoolOpts 配置连接池
59    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
60        let port = connection.hostport.parse::<i32>()
61            .map_err(|e| format!("端口号解析失败: {} ({})", connection.hostport, e))?;
62
63        let cp_connection = connection.clone();
64        let config = object! {
65            debug: cp_connection.debug,
66            username: cp_connection.username,
67            userpass: cp_connection.userpass,
68            database: cp_connection.database,
69            hostname: cp_connection.hostname,
70            hostport: port,
71            charset: cp_connection.charset.str(),
72        };
73        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
74
75        // 配置连接池约束(参考 mysql 的实现)
76        // 从配置中读取 pool_max,如果没有则使用默认值 400(与 mysql 保持一致)
77        let max_pools = config["pool_max"].as_u32()
78            .filter(|&p| p > 0 && p <= 1000)
79            .unwrap_or(400) as usize;
80        
81        let constraints = PoolConstraints::new(0, max_pools)
82            .map_err(|e| format!("连接池约束配置失败: {}", e))?;
83        
84        // 配置连接池选项(参考 mysql 的实现)
85        let pool_opts = PoolOpts::default()
86            .with_constraints(constraints)
87            .with_reset_connection(true)
88            .with_connect_timeout(Duration::from_secs(5))
89            .with_read_timeout(Duration::from_secs(15))
90            .with_write_timeout(Duration::from_secs(20))
91            .with_tcp_keepalive(Duration::from_secs(5));
92
93        // 使用新的连接池配置方式(参考 mysql 的实现)
94        let pools = pgsql.pools_with_opts(pool_opts)?;
95        
96        Ok(Self {
97            connection,
98            default: default.clone(),
99            params: Params::default("pgsql"),
100            client: pools,
101        })
102    }
103
104    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
105        let thread_id = format!("{:?}", thread::current().id());
106        let key = format!("{}{}", self.default, thread_id);
107
108        // === 事务环境 ===
109        // 修复:在单次锁持有期间同时检查 TRANS 和 TR,避免检查-使用时间窗口问题
110        let db_opt = {
111            let trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
112            if trans_map.get(&*thread_id).is_some() {
113                // 在事务环境中,同时获取 TR 锁查找连接
114                let tr_map = TR.lock().unwrap_or_else(|e| e.into_inner());
115                tr_map.get(&key).cloned()
116            } else {
117                None
118            }
119        };
120
121        if let Some(db) = db_opt {
122            // 事务环境:使用事务连接
123            let mut t = db.lock().unwrap();
124            match t.query(sql) {
125                Ok(e) => {
126                    // 查询成功(已移除正常日志)
127                    (true, e.rows)
128                }
129                Err(e) => {
130                    error!(
131                        "事务查询失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
132                        thread_id,
133                        self.default,
134                        self.connection.database,
135                        sql,
136                        e
137                    );
138                    (false, JsonValue::from(e.to_string()))
139                }
140            }
141        } else {
142            // 非事务环境下使用 ConnectionGuard
143            let mut guard = match self.client.get_guard() {
144                Ok(g) => g,
145                Err(e) => {
146                    error!(
147                        "获取数据库连接失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
148                        thread_id,
149                        self.default,
150                        self.connection.database,
151                        sql,
152                        e
153                    );
154                    return (false, JsonValue::from(e.to_string()));
155                }
156            };
157
158            let res = guard.conn().query(sql);
159            match res {
160                Ok(e) => {
161                    // 查询成功(已移除正常日志)
162                    (true, e.rows)
163                }
164                Err(e) => {
165                    error!(
166                        "非事务查询失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
167                        thread_id,
168                        self.default,
169                        self.connection.database,
170                        sql,
171                        e
172                    );
173                    (false, JsonValue::from(e.to_string()))
174                }
175            }
176            // guard 离开作用域时自动归还连接
177        }
178    }
179    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
180        let thread_id = format!("{:?}", thread::current().id());
181        let key = format!("{}{}", self.default, thread_id);
182
183
184        // === 事务环境 ===
185        // 修复:在单次锁持有期间同时检查 TRANS 和 TR,避免检查-使用时间窗口问题
186        let db_opt = {
187            let trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
188            if trans_map.get(&*thread_id).is_some() {
189                // 在事务环境中,同时获取 TR 锁查找连接
190                let tr_map = TR.lock().unwrap_or_else(|e| e.into_inner());
191                tr_map.get(&key).cloned()
192            } else {
193                None
194            }
195        };
196
197        if let Some(db) = db_opt {
198            // 事务环境:使用事务连接
199            let mut t = db.lock().unwrap();
200            match t.execute(sql) {
201                Ok(e) => {
202                    // 提交成功(已移除正常日志)
203                    if sql.contains("INSERT") {
204                        (true, e.rows)
205                    } else {
206                        (true, e.affect_count.into())
207                    }
208                }
209                Err(e) => {
210                    let operation_type = if sql.trim_start().to_uppercase().starts_with("INSERT") {
211                        "插入"
212                    } else if sql.trim_start().to_uppercase().starts_with("UPDATE") {
213                        "更新"
214                    } else if sql.trim_start().to_uppercase().starts_with("DELETE") {
215                        "删除"
216                    } else {
217                        "执行"
218                    };
219                    error!(
220                        "事务{}失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | 表名: {} | SQL语句: [{}] | 错误详情: {}",
221                        operation_type,
222                        thread_id,
223                        self.default,
224                        self.connection.database,
225                        self.params.table,
226                        sql,
227                        e
228                    );
229                    (false, JsonValue::from(e.to_string()))
230                }
231            }
232        } else {
233            // 非事务环境下使用 ConnectionGuard
234            let mut guard = match self.client.get_guard() {
235                Ok(g) => g,
236                Err(e) => {
237                    error!(
238                        "获取数据库连接失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
239                        thread_id,
240                        self.default,
241                        self.connection.database,
242                        sql,
243                        e
244                    );
245                    return (false, JsonValue::from(e.to_string()));
246                }
247            };
248
249            let res = guard.conn().execute(sql);
250            match res {
251                Ok(e) => {
252                    // 提交成功(已移除正常日志)
253                    if sql.contains("INSERT") {
254                        (true, e.rows)
255                    } else {
256                        (true, e.affect_count.into())
257                    }
258                }
259                Err(e) => {
260                    let operation_type = if sql.trim_start().to_uppercase().starts_with("INSERT") {
261                        "插入"
262                    } else if sql.trim_start().to_uppercase().starts_with("UPDATE") {
263                        "更新"
264                    } else if sql.trim_start().to_uppercase().starts_with("DELETE") {
265                        "删除"
266                    } else {
267                        "执行"
268                    };
269                    error!(
270                        "非事务{}失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | 表名: {} | SQL语句: [{}] | 错误详情: {}",
271                        operation_type,
272                        thread_id,
273                        self.default,
274                        self.connection.database,
275                        self.params.table,
276                        sql,
277                        e
278                    );
279                    (false, JsonValue::from(e.to_string()))
280                }
281            }
282            // guard 离开作用域时自动归还连接
283        }
284    }
285}
286
287impl DbMode for Pgsql {
288    fn database_tables(&mut self) -> JsonValue {
289        let sql = "SELECT table_name FROM information_schema.tables 
290             WHERE table_schema = 'public' AND table_type = 'BASE TABLE'".to_string();
291        match self.sql(sql.as_str()) {
292            Ok(e) => {
293                let mut list = vec![];
294                for item in e.members() {
295                    if let Some(value) = item["table_name"].as_str() {
296                        list.push(JsonValue::from(value));
297                    }
298                }
299                list.into()
300            }
301            Err(_) => {
302                array![]
303            }
304        }
305    }
306
307    fn database_create(&mut self, name: &str) -> bool {
308        // 先检查数据库是否已存在
309        let check_sql = format!(
310            "SELECT 1 FROM pg_database WHERE datname = '{}'",
311            name
312        );
313        let (exists_state, exists_data) = self.query(check_sql.as_str());
314        
315        if exists_state && !exists_data.is_empty() && exists_data.members().count() > 0 {
316            // 数据库已存在,返回 true
317            return true;
318        }
319        
320        // 转义数据库名称(PostgreSQL 使用双引号)
321        let db_name = format!("\"{}\"", name);
322        
323        // 构建 CREATE DATABASE SQL
324        let mut sql = format!("CREATE DATABASE {}", db_name);
325        
326        // 添加字符集配置(PostgreSQL 使用 ENCODING)
327        if !self.connection.charset.str().is_empty() {
328            let charset_str = self.connection.charset.str();
329            let encoding = match charset_str.as_str() {
330                "utf8" | "utf8mb4" => "UTF8",
331                "latin1" => "LATIN1",
332                _ => "UTF8",
333            };
334            sql = format!("{} ENCODING '{}'", sql, encoding);
335        }
336        
337        let (state, data) = self.execute(sql.as_str());
338        match state {
339            true => true, // PostgreSQL 的 CREATE DATABASE 成功时返回空结果
340            false => {
341                error!("创建数据库失败: {data:?}");
342                false
343            }
344        }
345    }
346
347    fn database_update(&mut self, name: &str, options: JsonValue) -> bool {
348        let db_name = format!("\"{}\"", name);
349        let mut sql_parts = vec![];
350        
351        // 更新字符集
352        if options.has_key("encoding") {
353            let encoding = options["encoding"].as_str().unwrap_or("UTF8");
354            sql_parts.push(format!("ENCODING '{}'", encoding));
355        }
356        
357        // 更新所有者
358        if options.has_key("owner") {
359            let owner = options["owner"].as_str().unwrap_or("");
360            sql_parts.push(format!("OWNER = \"{}\"", owner));
361        }
362        
363        if sql_parts.is_empty() {
364            return true; // 没有需要更新的内容
365        }
366        
367        let sql = format!("ALTER DATABASE {} {}", db_name, sql_parts.join(" "));
368        let (state, data) = self.execute(sql.as_str());
369        
370        match state {
371            true => true,
372            false => {
373                error!("更新数据库失败: {data:?}");
374                false
375            }
376        }
377    }
378}
379
380impl Mode for Pgsql {
381    fn transaction(&mut self) -> bool {
382        let thread_id = format!("{:?}", thread::current().id());
383
384        // 修复:在单次锁持有期间完成嵌套计数检查和更新,保证原子性
385        let is_nested = {
386            let mut trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
387            if let Some(count) = trans_map.get_mut(&*thread_id) {
388                // 嵌套事务:增加计数(在同一把锁内完成)
389                *count += 1;
390                true
391            } else {
392                // 新事务:初始化计数
393                trans_map.insert(thread_id.clone(), 1);
394                false
395            }
396        };
397
398        if is_nested {
399            return true;
400        }
401
402        let key = format!("{}{}", self.default, thread_id);
403
404        // 获取连接并克隆(事务需要长期持有)
405        let mut guard = match self.client.get_guard() {
406            Ok(g) => g,
407            Err(e) => {
408                error!("获取事务连接失败: {e}");
409                // 清理已插入的 TRANS 记录
410                TRANS.lock().unwrap_or_else(|e| e.into_inner()).remove(&*thread_id);
411                return false;
412            }
413        };
414
415        let conn = guard.conn().clone();
416        drop(guard);
417
418        TR.lock().unwrap_or_else(|e| e.into_inner()).insert(key.clone(), Arc::new(Mutex::new(conn)));
419
420        let sql = "START TRANSACTION;".to_string();
421        let (state, _) = self.execute(sql.as_str());
422        match state {
423            true => {
424                let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
425                let (state, _) = self.execute(sql.as_str());
426                match state {
427                    true => state,
428                    false => {
429                        // 清理资源:先 TRANS 后 TR(保持一致的锁顺序)
430                        TRANS.lock().unwrap_or_else(|e| e.into_inner()).remove(&*thread_id);
431                        TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
432                        state
433                    }
434                }
435            }
436            false => {
437                // 清理资源:先 TRANS 后 TR(保持一致的锁顺序)
438                TRANS.lock().unwrap_or_else(|e| e.into_inner()).remove(&*thread_id);
439                TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
440                state
441            }
442        }
443    }
444
445    fn commit(&mut self) -> bool {
446        let thread_id = format!("{:?}", thread::current().id());
447        let key = format!("{}{}", self.default, thread_id);
448
449        // 修复:在单次锁持有期间完成嵌套计数检查和更新,保证原子性
450        let should_commit = {
451            let mut trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
452            if let Some(count) = trans_map.get_mut(&*thread_id) {
453                if *count > 1 {
454                    // 嵌套事务:减少计数(在同一把锁内完成)
455                    *count -= 1;
456                    false // 不需要执行 COMMIT
457                } else {
458                    // 最外层事务:标记为需要执行 COMMIT
459                    true
460                }
461            } else {
462                // 事务不存在,可能是已经被清理或从未创建
463                error!("提交事务失败: 线程ID: {thread_id} 事务不存在");
464                return false;
465            }
466        };
467
468        if !should_commit {
469            return true;
470        }
471
472        // 最外层事务:执行提交
473        let sql = "COMMIT".to_string();
474        let (state, data) = self.execute(sql.as_str());
475
476        // 修复:仅在成功时清理资源,失败时保留资源以便重试或回滚
477        if state {
478            // 清理资源:先 TRANS 后 TR(保持一致的锁顺序)
479            TRANS.lock().unwrap_or_else(|e| e.into_inner()).remove(&thread_id);
480            TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
481        } else {
482            let error_msg = data.as_str().map(|s| s.to_string()).unwrap_or_else(|| data.to_string());
483            error!(
484                "提交事务失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL: {} | 错误详情: {}",
485                thread_id,
486                self.default,
487                self.connection.database,
488                sql,
489                error_msg
490            );
491            // 注意:失败时不清理资源,保留 TRANS 和 TR 以便后续重试或回滚
492        }
493        state
494    }
495
496    fn rollback(&mut self) -> bool {
497        let thread_id = format!("{:?}", thread::current().id());
498        let key = format!("{}{}", self.default, thread_id);
499
500        // 修复:在单次锁持有期间完成嵌套计数检查和更新,保证原子性
501        let should_rollback = {
502            let mut trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
503            if let Some(count) = trans_map.get_mut(&*thread_id) {
504                if *count > 1 {
505                    // 嵌套事务:减少计数(在同一把锁内完成)
506                    *count -= 1;
507                    false // 不需要执行 ROLLBACK
508                } else {
509                    // 最外层事务:标记为需要执行 ROLLBACK
510                    true
511                }
512            } else {
513                // 事务不存在,可能是已经被清理或从未创建
514                error!("回滚事务失败: 线程ID: {thread_id} 事务不存在");
515                return false;
516            }
517        };
518
519        if !should_rollback {
520            return true;
521        }
522
523        // 最外层事务:执行回滚
524        let sql = "ROLLBACK".to_string();
525        let (state, data) = self.execute(sql.as_str());
526
527        // 修复:无论成功与否都清理资源(回滚失败时也需要清理,避免资源泄漏)
528        TRANS.lock().unwrap_or_else(|e| e.into_inner()).remove(&thread_id);
529        TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
530
531        if !state {
532            let error_msg = data.as_str().map(|s| s.to_string()).unwrap_or_else(|| data.to_string());
533            error!(
534                "回滚事务失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL: {} | 错误详情: {}",
535                thread_id,
536                self.default,
537                self.connection.database,
538                sql,
539                error_msg
540            );
541        }
542        state
543    }
544
545    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
546        let (state, data) = self.query(sql);
547        match state {
548            true => Ok(data),
549            false => Err(data.to_string()),
550        }
551    }
552
553    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
554        let (state, data) = self.execute(sql);
555        match state {
556            true => Ok(data),
557            false => Err(data.to_string()),
558        }
559    }
560
561    fn table_create(&mut self, options: TableOptions) -> JsonValue {
562        // 清除缓存
563        let cache_key = format!("{}{}", self.default, options.table_name);
564        if TABLE_FIELDS.lock().unwrap().get(&cache_key).is_some() {
565            TABLE_FIELDS.lock().unwrap().remove(&cache_key);
566        }
567
568        let mut sql = String::new();
569        let mut unique = String::new();
570        let mut index = String::new();
571
572        // 处理唯一索引
573        let mut unique_fields = String::new();
574        for item in options.table_unique.iter() {
575            if unique_fields.is_empty() {
576                unique_fields = format!("\"{}\"", item);
577            } else {
578                unique_fields = format!("{}, \"{}\"", unique_fields, item);
579            }
580        }
581        if !unique_fields.is_empty() {
582            let mut unique_name = String::new();
583            for item in options.table_unique.iter() {
584                if unique_name.is_empty() {
585                    unique_name = format!("{}_unique_{}", options.table_name, item);
586                } else {
587                    unique_name = format!("{}_{}", unique_name, item);
588                }
589            }
590            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
591            unique_name = generate_index_name(&format!("unique_{}", md5));
592            unique = format!("CONSTRAINT {} UNIQUE ({})", unique_name, unique_fields);
593        }
594
595        // 处理普通索引
596        for row in options.table_index.iter() {
597            let mut index_fields = String::new();
598            let mut index_name = String::new();
599            for item in row {
600                if index_fields.is_empty() {
601                    index_fields = format!("\"{}\"", item);
602                    index_name = format!("{}_index_{}", options.table_name, item);
603                } else {
604                    index_fields = format!("{}, \"{}\"", index_fields, item);
605                    index_name = format!("{}_{}", index_name, item);
606                }
607            }
608            let md5 = br_crypto::md5::encrypt_hex(index_name.as_bytes());
609            index_name = generate_index_name(&format!("index_{}", md5));
610            if index.is_empty() {
611                index = format!("CREATE INDEX {} ON {} ({});", index_name, options.table_name, index_fields);
612            } else {
613                index = format!("{};\r\nCREATE INDEX {} ON {} ({});", index, index_name, options.table_name, index_fields);
614            }
615        }
616
617        // 生成字段定义
618        for (name, field) in options.table_fields.entries() {
619            let row = br_fields::field("pgsql", name, field.clone());
620            sql = format!("{sql} {row},\r\n");
621        }
622
623        // 添加主键
624        if !unique.is_empty() {
625            sql = sql.trim_end_matches(",\r\n").to_string();
626            sql = format!("{sql},\r\n{unique}");
627        }
628        sql = if sql.trim_end().ends_with(",") {
629            format!("{}\r\nPRIMARY KEY(\"{}\")", sql, options.table_key)
630        } else {
631            format!("{},\r\nPRIMARY KEY(\"{}\")", sql, options.table_key)
632        };
633
634        // 生成完整的 CREATE TABLE 语句
635        let create_sql = format!(
636            "CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n);\r\n{}",
637            options.table_name, sql, index
638        );
639
640        if self.params.sql {
641            return JsonValue::from(create_sql);
642        }
643
644        
645        // 执行 CREATE TABLE 语句
646        let (state, data) = self.execute(create_sql.as_str());
647        
648        
649        
650        match state {
651            true => JsonValue::from(true),
652            false => {
653                error!("创建表错误: {} - {}", options.table_name, data);
654                JsonValue::from(false)
655            }
656        }
657    }
658
659    fn table_update(&mut self, options: TableOptions) -> JsonValue {
660        // 检查表是否存在
661        if !self.table_is_exist(&options.table_name) {
662            // 表不存在,尝试创建表
663            warn!("表 {} 不存在,尝试创建表", options.table_name);
664            return self.table_create(options);
665        }
666
667        // 清除缓存
668        let cache_key = format!("{}{}", self.default, options.table_name);
669        if TABLE_FIELDS.lock().unwrap().get(&cache_key).is_some() {
670            TABLE_FIELDS.lock().unwrap().remove(&cache_key);
671        }
672        
673        let mut sql = vec![];
674        let fields_list = self.table_info(&options.table_name);
675        let mut put = vec![];
676        let mut add = vec![];
677        let mut del = vec![];
678        
679        // 检查需要删除的字段(在新配置中为空)
680        for (key, _) in fields_list.entries() {
681            if options.table_fields[key].is_empty() {
682                del.push(key);
683            }
684        }
685        
686        // 检查需要添加或修改的字段
687        for (name, field) in options.table_fields.entries() {
688            if !fields_list[name].is_empty() {
689                // 字段已存在,检查是否需要更新
690                let old_field = &fields_list[name];
691                
692                // 获取新字段的标题(从 field JSON 对象中)
693                let new_title = field["title"].as_str().unwrap_or("");
694                
695                // 获取旧字段的注释,并从中提取标题
696                // 注释格式通常是 'title|mode|...',标题是第一部分
697                let old_comment = old_field["comment"].as_str().unwrap_or("");
698                let old_title = if !old_comment.is_empty() {
699                    old_comment.split('|').next().unwrap_or("")
700                } else {
701                    ""
702                };
703                
704                // 比较字段名和标题的合并字符串:"{字段名}|{标题}"
705                let new_field_title = format!("{}|{}", name, new_title);
706                let old_field_title = format!("{}|{}", name, old_title);
707                
708                // 如果有差异,需要更新
709                if new_field_title != old_field_title {
710                    put.push(name);
711                    continue;
712                }
713                
714                // 如果字段名和标题都相同,跳过更新
715                continue;
716            } else {
717                // 字段不存在,需要添加
718                add.push(name);
719            }
720        }
721        
722        // 如果没有需要添加、删除或修改的字段,直接返回 -1 表示没有变化
723        if add.is_empty() && del.is_empty() && put.is_empty() {
724            info!("数据库更新情况: {} 成功 更新前检查字段是否有变化,没有变化的不需要更新", options.table_name);
725            return JsonValue::from(-1);
726        }
727        
728        // 生成 ALTER TABLE 语句
729        for name in add.iter() {
730            let name = name.to_string();
731            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
732            sql.push(format!("ALTER TABLE {} ADD COLUMN {};\r\n", options.table_name, row));
733        }
734        
735        for name in del.iter() {
736            sql.push(format!("ALTER TABLE {} DROP COLUMN \"{}\";\r\n", options.table_name, name));
737        }
738        
739        for name in put.iter() {
740            let name = name.to_string();
741            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
742            
743            // 解析字段定义,提取类型和注释
744            let comment_parts: Vec<&str> = row.split(" comment ").collect();
745            let field_type_part = if comment_parts.len() > 1 {
746                comment_parts[0].trim()
747            } else {
748                row.trim()
749            };
750            
751            // PostgreSQL ALTER COLUMN 需要分别处理类型和注释
752            // 提取类型部分(去掉字段名,只保留类型定义)
753            let type_def = if let Some(space_pos) = field_type_part.find(' ') {
754                &field_type_part[space_pos + 1..]
755            } else {
756                field_type_part
757            };
758            
759            // 更新字段类型
760            sql.push(format!("ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n", 
761                options.table_name, name, type_def));
762            
763            // 更新注释(如果存在)
764            if comment_parts.len() > 1 {
765                let comment = comment_parts[1].trim_start_matches("'").trim_end_matches("'");
766                sql.push(format!(
767                    "COMMENT ON COLUMN {}.{} IS '{}';\r\n",
768                    options.table_name, name, comment
769                ));
770            }
771        }
772        
773        // 处理唯一索引
774        let mut unique_fields = String::new();
775        let mut unique_name = String::new();
776        for item in options.table_unique.iter() {
777            if unique_fields.is_empty() {
778                unique_fields = format!("\"{}\"", item);
779                unique_name = format!("{}_unique_{}", options.table_name, item);
780            } else {
781                unique_fields = format!("{}, \"{}\"", unique_fields, item);
782                unique_name = format!("{}_{}", unique_name, item);
783            }
784        }
785        
786        if !unique_name.is_empty() {
787            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
788            unique_name = generate_index_name(&format!("unique_{}", md5));
789            
790            // 查询现有唯一索引
791            let (_, index_list) = self.query(format!(
792                "SELECT indexname FROM pg_indexes WHERE tablename = '{}' AND indexdef LIKE '%UNIQUE%'",
793                options.table_name
794            ).as_str());
795            
796            let mut unique_new = vec![];
797            for item in index_list.members() {
798                if let Some(index_name) = item["indexname"].as_str() {
799                    unique_new.push(index_name.to_string());
800                }
801            }
802            
803            // 删除不匹配的唯一索引
804            for item in &unique_new {
805                if unique_name != *item {
806                    sql.push(format!("DROP INDEX IF EXISTS {};\r\n", item));
807                }
808            }
809            
810            // 创建新的唯一索引(如果不存在)
811            if !unique_new.contains(&unique_name) {
812                sql.push(format!(
813                    "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});\r\n",
814                    unique_name, options.table_name, unique_fields
815                ));
816            }
817        }
818        
819        // 处理普通索引
820        let mut index_list = vec![];
821        for row in options.table_index.iter() {
822            let mut index_fields = String::new();
823            let mut index_name = String::new();
824            for item in row {
825                if index_fields.is_empty() {
826                    index_fields = format!("\"{}\"", item);
827                    index_name = format!("{}_index_{}", options.table_name, item);
828                } else {
829                    index_fields = format!("{}, \"{}\"", index_fields, item);
830                    index_name = format!("{}_{}", index_name, item);
831                }
832            }
833            let md5 = br_crypto::md5::encrypt_hex(index_name.as_bytes());
834            index_name = generate_index_name(&format!("index_{}", md5));
835            index_list.push(index_name.clone());
836            
837            // 查询现有索引
838            let (_, existing_indexes) = self.query(format!(
839                "SELECT indexname FROM pg_indexes WHERE tablename = '{}'",
840                options.table_name
841            ).as_str());
842            
843            let mut existing_index_names = vec![];
844            for item in existing_indexes.members() {
845                if let Some(index_name) = item["indexname"].as_str() {
846                    existing_index_names.push(index_name.to_string());
847                }
848            }
849            
850            if !existing_index_names.contains(&index_name) {
851                sql.push(format!(
852                    "CREATE INDEX IF NOT EXISTS {} ON {} ({});\r\n",
853                    index_name, options.table_name, index_fields
854                ));
855            }
856        }
857        
858        // 删除不再需要的索引
859        let (_, all_indexes) = self.query(format!(
860            "SELECT indexname FROM pg_indexes WHERE tablename = '{}'",
861            options.table_name
862        ).as_str());
863        
864        for item in all_indexes.members() {
865            if let Some(index_name) = item["indexname"].as_str() {
866                // 跳过主键索引和唯一索引(它们由上面的逻辑处理)
867                if !index_name.contains("_pkey") && !index_list.contains(&index_name.to_string()) {
868                    // 检查是否是表相关的索引
869                    if index_name.starts_with(&format!("{}_", options.table_name)) {
870                        sql.push(format!("DROP INDEX IF EXISTS {};\r\n", index_name));
871                    }
872                }
873            }
874        }
875        
876        if self.params.sql {
877            return JsonValue::from(sql.join(""));
878        }
879        
880        // 如果没有需要执行的 SQL,返回 -1 表示没有变化
881        if sql.is_empty() {
882            info!("数据库更新情况: {} 成功 更新前检查字段是否有变化,没有变化的不需要更新", options.table_name);
883            return JsonValue::from(-1);
884        }
885        
886        // 执行 SQL 语句
887        for item in sql.iter() {
888            let (state, res) = self.execute(item.as_str());
889            match state {
890                true => {}
891                false => {
892                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
893                    return JsonValue::from(0);
894                }
895            }
896        }
897        
898        JsonValue::from(1)
899    }
900
901    fn table_info(&mut self, table: &str) -> JsonValue {
902        let cache_key = format!("{}{}", self.default, table);
903        {
904            let fields = TABLE_FIELDS.lock().unwrap();
905            if let Some(cached) = fields.get(&cache_key) {
906                return cached.clone();
907            }
908        }
909        
910        // 查询 PostgreSQL 表结构信息
911        let sql = format!(
912            "SELECT 
913                COL.COLUMN_NAME,
914                COL.DATA_TYPE,
915                COL.UDT_NAME,
916                COL.CHARACTER_MAXIMUM_LENGTH,
917                COL.NUMERIC_PRECISION,
918                COL.NUMERIC_SCALE,
919                COL.COLUMN_DEFAULT,
920                COL.IS_NULLABLE,
921                COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT
922            FROM INFORMATION_SCHEMA.COLUMNS COL
923            LEFT JOIN pg_catalog.pg_description DESCRIPTION
924                ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
925                AND DESCRIPTION.objoid = (
926                    SELECT oid FROM pg_catalog.pg_class 
927                    WHERE relname = COL.TABLE_NAME 
928                    AND relnamespace = (SELECT oid FROM pg_catalog.pg_namespace WHERE nspname = 'public')
929                    LIMIT 1
930                )
931            WHERE COL.TABLE_SCHEMA = 'public' 
932                AND COL.TABLE_NAME = '{}'",
933            table
934        );
935        
936        let (state, data) = self.query(sql.as_str());
937        let mut list = object! {};
938        
939        if state {
940            for item in data.members() {
941                if let Some(field_name) = item["COLUMN_NAME"].as_str() {
942                    let mut row = object! {};
943                    row["field"] = JsonValue::from(field_name);
944                    row["type"] = item["DATA_TYPE"].clone();
945                    row["udt_name"] = item["UDT_NAME"].clone();
946                    row["comment"] = item["COMMENT"].clone();
947                    row["is_nullable"] = item["IS_NULLABLE"].clone();
948                    row["column_default"] = item["COLUMN_DEFAULT"].clone();
949                    row["character_maximum_length"] = item["CHARACTER_MAXIMUM_LENGTH"].clone();
950                    row["numeric_precision"] = item["NUMERIC_PRECISION"].clone();
951                    row["numeric_scale"] = item["NUMERIC_SCALE"].clone();
952                    list[field_name] = row;
953                }
954            }
955            let list_clone = list.clone();
956            TABLE_FIELDS.lock().unwrap().insert(cache_key, list_clone);
957        }
958        list
959    }
960
961    fn table_is_exist(&mut self, name: &str) -> bool {
962        let sql = format!(
963            "SELECT COUNT(*) as count FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{}'",
964            name
965        );
966        let (state, data) = self.query(sql.as_str());
967        match state {
968            true => {
969                if !data.is_empty() {
970                    if let Some(count) = data[0]["count"].as_i64() {
971                        return count > 0;
972                    }
973                }
974                false
975            }
976            false => false,
977        }
978    }
979
980    fn table(&mut self, _name: &str) -> &mut Pgsql {
981        self
982    }
983
984    fn change_table(&mut self, _name: &str) -> &mut Self {
985        self
986    }
987
988    fn autoinc(&mut self) -> &mut Self {
989        self
990    }
991
992    fn fetch_sql(&mut self) -> &mut Self {
993        self
994    }
995
996    fn order(&mut self, _field: &str, _by: bool) -> &mut Self {
997        self
998    }
999
1000    fn group(&mut self, _field: &str) -> &mut Self {
1001        self
1002    }
1003
1004    fn distinct(&mut self) -> &mut Self {
1005        self
1006    }
1007
1008    fn json(&mut self, _field: &str) -> &mut Self {
1009        self
1010    }
1011
1012    fn location(&mut self, _field: &str) -> &mut Self {
1013        self
1014    }
1015
1016    fn field(&mut self, _field: &str) -> &mut Self {
1017        self
1018    }
1019
1020    fn hidden(&mut self, _name: &str) -> &mut Self {
1021        self
1022    }
1023
1024    fn where_and(&mut self, _field: &str, _compare: &str, _value: JsonValue) -> &mut Self {
1025        self
1026    }
1027
1028    fn where_or(&mut self, _field: &str, _compare: &str, _value: JsonValue) -> &mut Self {
1029        self
1030    }
1031
1032    fn where_column(&mut self, _field_a: &str, _compare: &str, _field_b: &str) -> &mut Self {
1033        self
1034    }
1035
1036    fn update_column(&mut self, _field_a: &str, _compare: &str) -> &mut Self {
1037        self
1038    }
1039
1040    fn page(&mut self, _page: i32, _limit: i32) -> &mut Self {
1041        self
1042    }
1043
1044    fn column(&mut self, _field: &str) -> JsonValue {
1045        JsonValue::Null
1046    }
1047
1048    fn count(&mut self) -> JsonValue {
1049        JsonValue::from(0)
1050    }
1051
1052    fn max(&mut self, _field: &str) -> JsonValue {
1053        JsonValue::from(0)
1054    }
1055
1056    fn min(&mut self, _field: &str) -> JsonValue {
1057        JsonValue::from(0)
1058    }
1059
1060    fn sum(&mut self, _field: &str) -> JsonValue {
1061        JsonValue::from(0)
1062    }
1063
1064    fn avg(&mut self, _field: &str) -> JsonValue {
1065        JsonValue::from(0)
1066    }
1067
1068    fn select(&mut self) -> JsonValue {
1069        array![]
1070    }
1071
1072    fn find(&mut self) -> JsonValue {
1073        object! {}
1074    }
1075
1076    fn value(&mut self, _field: &str) -> JsonValue {
1077        JsonValue::Null
1078    }
1079
1080    fn insert(&mut self, _data: JsonValue) -> JsonValue {
1081        JsonValue::from("")
1082    }
1083
1084    fn insert_all(&mut self, _data: JsonValue) -> JsonValue {
1085        array![]
1086    }
1087
1088    fn update(&mut self, _data: JsonValue) -> JsonValue {
1089        JsonValue::from(0)
1090    }
1091
1092    fn update_all(&mut self, _data: JsonValue) -> JsonValue {
1093        JsonValue::from(0)
1094    }
1095
1096    fn delete(&mut self) -> JsonValue {
1097        JsonValue::from(0)
1098    }
1099
1100    fn inc(&mut self, _field: &str, _num: f64) -> &mut Self {
1101        self
1102    }
1103
1104    fn dec(&mut self, _field: &str, _num: f64) -> &mut Self {
1105        self
1106    }
1107
1108    fn buildsql(&mut self) -> String {
1109        String::new()
1110    }
1111
1112    fn join_fields(&mut self, _fields: Vec<&str>) -> &mut Self {
1113        self
1114    }
1115
1116    fn join(&mut self, _main_table: &str, _main_fields: &str, _right_table: &str, _right_fields: &str) -> &mut Self {
1117        self
1118    }
1119
1120    fn join_inner(&mut self, _table: &str, _main_fields: &str, _second_fields: &str) -> &mut Self {
1121        self
1122    }
1123}