Skip to main content

br_db/types/
pgsql.rs

1use crate::config::Connection;
2use crate::types::{DbMode, DbResult, Mode, Params, TableOptions};
3use br_pgsql::connect::Connect;
4use br_pgsql::pools::Pools;
5use br_pgsql::{PoolConstraints, PoolOpts};
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info, warn};
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::Mutex;
12use std::thread;
13use std::time::Duration;
14
15/// PostgreSQL 标识符最大长度(NAMEDATALEN - 1)
16const PG_MAX_IDENTIFIER_LENGTH: usize = 63;
17
18/// 生成 PostgreSQL 索引名称
19/// 如果名称超过 63 个字符,使用 MD5 哈希缩短
20fn generate_index_name(base_name: &str) -> String {
21    if base_name.len() <= PG_MAX_IDENTIFIER_LENGTH {
22        base_name.to_string()
23    } else {
24        // 使用 MD5 哈希生成固定长度的名称
25        let md5 = br_crypto::md5::encrypt_hex(base_name.as_bytes());
26        // 保留表名前缀的一部分,然后加上哈希值
27        // 格式:{table}_hash_{hash},确保总长度不超过 63 个字符
28        if let Some(underscore_pos) = base_name.find('_') {
29            // 计算可用长度:63 - "_hash_" (6) - MD5 前 16 个字符 (16) = 最多 41 个字符用于前缀
30            let max_prefix_len = PG_MAX_IDENTIFIER_LENGTH - 6 - 16;
31            let prefix = &base_name[..underscore_pos.min(max_prefix_len)];
32            format!("{}_hash_{}", prefix, &md5[..16])
33        } else {
34            // 如果找不到下划线,直接使用哈希值
35            format!("idx_{}", &md5[..(PG_MAX_IDENTIFIER_LENGTH - 4)])
36        }
37    }
38}
39
40lazy_static! {
41    static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> =
42        Arc::new(Mutex::new(HashMap::new()));
43    static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
44    static ref TABLE_FIELDS: Arc<Mutex<HashMap<String, JsonValue>>> =
45        Arc::new(Mutex::new(HashMap::new()));
46    /// 连接池健康监控统计
47    static ref POOL_STATS: Arc<Mutex<PoolStats>> = Arc::new(Mutex::new(PoolStats::new()));
48}
49
50/// 连接池健康状态
51#[derive(Debug, Clone)]
52pub struct PoolHealthStatus {
53    /// 是否健康
54    pub is_healthy: bool,
55    /// 总连接数
56    pub total_connections: usize,
57    /// 活跃连接数
58    pub active_connections: usize,
59    /// 连接池使用率 (0.0 - 1.0)
60    pub utilization: f64,
61    /// 空闲连接数
62    pub idle_connections: usize,
63}
64
65/// 连接池统计信息
66#[derive(Debug, Clone)]
67struct PoolStats {
68    /// 总连接数
69    total_connections: usize,
70    /// 活跃连接数
71    active_connections: usize,
72    /// 空闲连接数
73    idle_connections: usize,
74    /// 连接创建总数
75    connections_created: u64,
76    /// 连接错误总数
77    connection_errors: u64,
78    /// 最后检查时间
79    last_check: std::time::Instant,
80}
81
82impl PoolStats {
83    fn new() -> Self {
84        Self {
85            total_connections: 0,
86            active_connections: 0,
87            idle_connections: 0,
88            connections_created: 0,
89            connection_errors: 0,
90            last_check: std::time::Instant::now(),
91        }
92    }
93
94    /// 记录连接创建
95    fn record_connection_created(&mut self) {
96        self.connections_created += 1;
97        self.total_connections += 1;
98        self.last_check = std::time::Instant::now();
99    }
100
101    /// 记录连接错误
102    fn record_connection_error(&mut self) {
103        self.connection_errors += 1;
104        self.last_check = std::time::Instant::now();
105    }
106
107    /// 增加活跃连接
108    fn increment_active(&mut self) {
109        self.active_connections += 1;
110        self.last_check = std::time::Instant::now();
111    }
112
113    /// 减少活跃连接
114    fn decrement_active(&mut self) {
115        if self.active_connections > 0 {
116            self.active_connections -= 1;
117        }
118        self.last_check = std::time::Instant::now();
119    }
120
121    /// 更新空闲连接数
122    fn update_idle_count(&mut self, idle_count: usize) {
123        self.idle_connections = idle_count;
124        self.last_check = std::time::Instant::now();
125    }
126
127    /// 获取连接池使用率
128    fn utilization_rate(&self) -> f64 {
129        if self.total_connections == 0 {
130            0.0
131        } else {
132            self.active_connections as f64 / self.total_connections as f64
133        }
134    }
135}
136
137/// RAII连接计数器,确保离开作用域时自动减少活跃连接计数
138struct ConnectionGuard;
139
140impl ConnectionGuard {
141    fn new() -> Self {
142        Self
143    }
144}
145
146impl Drop for ConnectionGuard {
147    fn drop(&mut self) {
148        // 自动减少活跃连接计数
149        let mut stats = POOL_STATS.lock().unwrap();
150        stats.decrement_active();
151    }
152}
153
154#[derive(Clone)]
155pub struct Pgsql {
156    /// 当前连接配置
157    pub connection: Connection,
158    /// 当前选中配置
159    pub default: String,
160    pub params: Params,
161    pub client: Pools,
162}
163
164impl Pgsql {
165    /// 连接数据库(参考 mysql 的实现方式)
166    /// 使用 PoolConstraints 和 PoolOpts 配置连接池
167    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
168        let port = connection
169            .hostport
170            .parse::<i32>()
171            .map_err(|e| format!("端口号解析失败: {} ({})", connection.hostport, e))?;
172
173        let cp_connection = connection.clone();
174        let config = object! {
175            debug: cp_connection.debug,
176            username: cp_connection.username,
177            userpass: cp_connection.userpass,
178            database: cp_connection.database,
179            hostname: cp_connection.hostname,
180            hostport: port,
181            charset: cp_connection.charset.str(),
182        };
183        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
184
185        // 配置连接池约束(参考 mysql 的实现)
186        // 智能计算最优连接池大小:CPU核心数 × 2 + 1
187        let cpu_cores = num_cpus::get();
188        let optimal_pool_size = (cpu_cores * 2) + 1;
189
190        // 从配置中读取 pool_max,如果没有则使用智能计算的默认值
191        // 但是限制在合理范围内,避免过度连接
192        let calculated_max = config["pool_max"]
193            .as_u32()
194            .filter(|&p| p > 0 && p <= 1000)
195            .unwrap_or(std::cmp::min(optimal_pool_size as u32, 50))
196            as usize; // 最大不超过50,保证稳定性
197
198        info!(
199            "PostgreSQL 连接池配置 - CPU核心数: {}, 计算最优值: {}, 实际使用: {}",
200            cpu_cores, optimal_pool_size, calculated_max
201        );
202
203        let max_pools = calculated_max;
204
205        let constraints =
206            PoolConstraints::new(0, max_pools).map_err(|e| format!("连接池约束配置失败: {}", e))?;
207
208        // 配置连接池选项(参考 mysql 的实现)
209        let pool_opts = PoolOpts::default()
210            .with_constraints(constraints)
211            .with_reset_connection(true)
212            .with_connect_timeout(Duration::from_secs(5))
213            .with_read_timeout(Duration::from_secs(15))
214            .with_write_timeout(Duration::from_secs(20))
215            .with_tcp_keepalive(Duration::from_secs(5));
216
217        // 使用新的连接池配置方式(参考 mysql 的实现)
218        let pools = pgsql.pools_with_opts(pool_opts)?;
219
220        // 初始化连接池统计
221        {
222            let mut stats = POOL_STATS.lock().unwrap();
223            stats.record_connection_created();
224        }
225
226        Ok(Self {
227            connection,
228            default: default.clone(),
229            params: Params::default("pgsql"),
230            client: pools,
231        })
232    }
233
234    /// 连接池健康检查(修复版)
235    pub async fn health_check(&mut self) -> DbResult<PoolHealthStatus> {
236        let _stats = POOL_STATS.lock().unwrap().clone();
237        
238        let is_healthy = {
239            match self.client.get_guard() {
240                Ok(mut guard) => {
241                    match guard.conn().query("SELECT 1") {
242                        Ok(_) => {
243                            // 成功获取连接并执行查询,说明健康
244                            drop(guard); // 确保连接返回池中
245                            true
246                        },
247                        Err(e) => {
248                            error!("连接池健康检查查询失败: {}", e);
249                            false
250                        }
251                    }
252                },
253                Err(e) => {
254                    error!("无法从连接池获取连接进行健康检查: {}", e);
255                    false
256                }
257            }
258        };
259        
260        let mut current_stats = POOL_STATS.lock().unwrap();
261        
262        // 更新总连接数(如果之前没有正确设置)
263        if current_stats.total_connections == 0 {
264            // 根据连接池配置设置合理的总连接数
265            let cpu_cores = num_cpus::get();
266            let optimal_pool_size = (cpu_cores * 2) + 1;
267            current_stats.total_connections = std::cmp::min(optimal_pool_size, 50);
268        }
269        
270        // 计算空闲连接数
271        current_stats.idle_connections = if current_stats.total_connections >= current_stats.active_connections {
272            current_stats.total_connections - current_stats.active_connections
273        } else {
274            0
275        };
276        
277        Ok(PoolHealthStatus {
278            is_healthy,
279            total_connections: current_stats.total_connections,
280            active_connections: current_stats.active_connections,
281            utilization: current_stats.utilization_rate(),
282            idle_connections: current_stats.idle_connections,
283        })
284    }
285
286    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
287        let thread_id = format!("{:?}", thread::current().id());
288        let key = format!("{}{}", self.default, thread_id);
289
290        // === 连接池监控 ===
291        // 使用RAII模式确保连接计数正确减少
292        let _connection_guard = ConnectionGuard::new();
293
294        {
295            let mut stats = POOL_STATS.lock().unwrap();
296            stats.increment_active();
297
298            // 检查连接池健康状态
299            let utilization = stats.utilization_rate();
300            if utilization > 0.8 {
301                warn!(
302                    "PostgreSQL 连接池使用率过高: {:.1}%, 活跃连接: {}, 总连接: {}",
303                    utilization * 100.0,
304                    stats.active_connections,
305                    stats.total_connections
306                );
307            }
308        }
309
310        // === 事务环境 ===
311        // 修复:在单次锁持有期间同时检查 TRANS 和 TR,避免检查-使用时间窗口问题
312        let db_opt = {
313            let trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
314            if trans_map.get(&*thread_id).is_some() {
315                // 在事务环境中,同时获取 TR 锁查找连接
316                let tr_map = TR.lock().unwrap_or_else(|e| e.into_inner());
317                tr_map.get(&key).cloned()
318            } else {
319                None
320            }
321        };
322
323        if let Some(db) = db_opt {
324            // 事务环境:使用事务连接
325            let mut t = db.lock().unwrap();
326            match t.query(sql) {
327                Ok(e) => {
328                    // 查询成功(已移除正常日志)
329                    (true, e.rows)
330                }
331                Err(e) => {
332                    error!(
333                        "事务查询失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
334                        thread_id,
335                        self.default,
336                        self.connection.database,
337                        sql,
338                        e
339                    );
340                    (false, JsonValue::from(e.to_string()))
341                }
342            }
343        } else {
344            // 非事务环境下使用 ConnectionGuard
345            let mut guard = match self.client.get_guard() {
346                Ok(g) => g,
347                Err(e) => {
348                    error!(
349                        "获取数据库连接失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
350                        thread_id,
351                        self.default,
352                        self.connection.database,
353                        sql,
354                        e
355                    );
356                    return (false, JsonValue::from(e.to_string()));
357                }
358            };
359
360            println!("DEBUG: About to execute query SQL: {}", sql);
361            let res = guard.conn().query(sql);
362            match res {
363                Ok(e) => {
364                    // 查询成功
365                    println!("DEBUG: Query SQL executed successfully, returned {} rows", e.rows.len());
366                    (true, e.rows)
367                }
368                Err(e) => {
369                    println!("DEBUG: Query SQL failed with error: {}", e);
370                    error!(
371                        "非事务查询失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
372                        thread_id,
373                        self.default,
374                        self.connection.database,
375                        sql,
376                        e
377                    );
378                    (false, JsonValue::from(e.to_string()))
379                }
380            }
381            // guard 离开作用域时自动归还连接
382        }
383    }
384    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
385        let thread_id = format!("{:?}", thread::current().id());
386        let key = format!("{}{}", self.default, thread_id);
387
388        // === 事务环境 ===
389        // 修复:在单次锁持有期间同时检查 TRANS 和 TR,避免检查-使用时间窗口问题
390        let db_opt = {
391            let trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
392            if trans_map.get(&*thread_id).is_some() {
393                // 在事务环境中,同时获取 TR 锁查找连接
394                let tr_map = TR.lock().unwrap_or_else(|e| e.into_inner());
395                tr_map.get(&key).cloned()
396            } else {
397                None
398            }
399        };
400
401        if let Some(db) = db_opt {
402            // 事务环境:使用事务连接
403            let mut t = db.lock().unwrap();
404            match t.execute(sql) {
405                Ok(e) => {
406                    // 提交成功(已移除正常日志)
407                    if sql.contains("INSERT") {
408                        (true, e.rows)
409                    } else {
410                        (true, e.affect_count.into())
411                    }
412                }
413                Err(e) => {
414                    let operation_type = if sql.trim_start().to_uppercase().starts_with("INSERT") {
415                        "插入"
416                    } else if sql.trim_start().to_uppercase().starts_with("UPDATE") {
417                        "更新"
418                    } else if sql.trim_start().to_uppercase().starts_with("DELETE") {
419                        "删除"
420                    } else {
421                        "执行"
422                    };
423                    error!(
424                        "事务{}失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | 表名: {} | SQL语句: [{}] | 错误详情: {}",
425                        operation_type,
426                        thread_id,
427                        self.default,
428                        self.connection.database,
429                        self.params.table,
430                        sql,
431                        e
432                    );
433                    (false, JsonValue::from(e.to_string()))
434                }
435            }
436        } else {
437            // 非事务环境下使用 ConnectionGuard
438            let mut guard = match self.client.get_guard() {
439                Ok(g) => g,
440                Err(e) => {
441                    error!(
442                        "获取数据库连接失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
443                        thread_id,
444                        self.default,
445                        self.connection.database,
446                        sql,
447                        e
448                    );
449                    return (false, JsonValue::from(e.to_string()));
450                }
451            };
452
453            println!("DEBUG: About to execute SQL: {}", sql);
454            let res = guard.conn().execute(sql);
455            match res {
456                Ok(e) => {
457                    // 提交成功
458                    println!("DEBUG: SQL executed successfully, affected rows: {}", e.affect_count);
459                    if sql.contains("INSERT") {
460                        (true, e.rows)
461                    } else {
462                        (true, e.affect_count.into())
463                    }
464                }
465                Err(e) => {
466                    println!("DEBUG: SQL failed with error: {}", e);
467                    let operation_type = if sql.trim_start().to_uppercase().starts_with("INSERT") {
468                        "插入"
469                    } else if sql.trim_start().to_uppercase().starts_with("UPDATE") {
470                        "更新"
471                    } else if sql.trim_start().to_uppercase().starts_with("DELETE") {
472                        "删除"
473                    } else {
474                        "执行"
475                    };
476                    error!(
477                        "非事务{}失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | 表名: {} | SQL语句: [{}] | 错误详情: {}",
478                        operation_type,
479                        thread_id,
480                        self.default,
481                        self.connection.database,
482                        self.params.table,
483                        sql,
484                        e
485                    );
486                    (false, JsonValue::from(e.to_string()))
487                }
488            }
489            // guard 离开作用域时自动归还连接
490        }
491    }
492}
493
494impl DbMode for Pgsql {
495    fn database_tables(&mut self) -> JsonValue {
496        let sql = "SELECT table_name FROM information_schema.tables 
497             WHERE table_schema = 'public' AND table_type = 'BASE TABLE'"
498            .to_string();
499        match self.sql(sql.as_str()) {
500            Ok(e) => {
501                let mut list = vec![];
502                for item in e.members() {
503                    if let Some(value) = item["table_name"].as_str() {
504                        list.push(JsonValue::from(value));
505                    }
506                }
507                list.into()
508            }
509            Err(_) => {
510                array![]
511            }
512        }
513    }
514
515    fn database_create(&mut self, name: &str) -> bool {
516        // 先检查数据库是否已存在
517        let check_sql = format!("SELECT 1 FROM pg_database WHERE datname = '{}'", name);
518        let (exists_state, exists_data) = self.query(check_sql.as_str());
519
520        if exists_state && !exists_data.is_empty() && exists_data.members().count() > 0 {
521            // 数据库已存在,返回 true
522            return true;
523        }
524
525        // 转义数据库名称(PostgreSQL 使用双引号)
526        let db_name = format!("\"{}\"", name);
527
528        // 构建 CREATE DATABASE SQL
529        let mut sql = format!("CREATE DATABASE {}", db_name);
530
531        // 添加字符集配置(PostgreSQL 使用 ENCODING)
532        if !self.connection.charset.str().is_empty() {
533            let charset_str = self.connection.charset.str();
534            let encoding = match charset_str.as_str() {
535                "utf8" | "utf8mb4" => "UTF8",
536                "latin1" => "LATIN1",
537                _ => "UTF8",
538            };
539            sql = format!("{} ENCODING '{}'", sql, encoding);
540        }
541
542        let (state, data) = self.execute(sql.as_str());
543        match state {
544            true => true, // PostgreSQL 的 CREATE DATABASE 成功时返回空结果
545            false => {
546                error!("创建数据库失败: {data:?}");
547                false
548            }
549        }
550    }
551
552    fn database_update(&mut self, name: &str, options: JsonValue) -> bool {
553        let db_name = format!("\"{}\"", name);
554        let mut sql_parts = vec![];
555
556        // 更新字符集
557        if options.has_key("encoding") {
558            let encoding = options["encoding"].as_str().unwrap_or("UTF8");
559            sql_parts.push(format!("ENCODING '{}'", encoding));
560        }
561
562        // 更新所有者
563        if options.has_key("owner") {
564            let owner = options["owner"].as_str().unwrap_or("");
565            sql_parts.push(format!("OWNER = \"{}\"", owner));
566        }
567
568        if sql_parts.is_empty() {
569            return true; // 没有需要更新的内容
570        }
571
572        let sql = format!("ALTER DATABASE {} {}", db_name, sql_parts.join(" "));
573        let (state, data) = self.execute(sql.as_str());
574
575        match state {
576            true => true,
577            false => {
578                error!("更新数据库失败: {data:?}");
579                false
580            }
581        }
582    }
583}
584
585impl Mode for Pgsql {
586    fn transaction(&mut self) -> bool {
587        let thread_id = format!("{:?}", thread::current().id());
588
589        // 修复:在单次锁持有期间完成嵌套计数检查和更新,保证原子性
590        let is_nested = {
591            let mut trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
592            if let Some(count) = trans_map.get_mut(&*thread_id) {
593                // 嵌套事务:增加计数(在同一把锁内完成)
594                *count += 1;
595                true
596            } else {
597                // 新事务:初始化计数
598                trans_map.insert(thread_id.clone(), 1);
599                false
600            }
601        };
602
603        if is_nested {
604            return true;
605        }
606
607        let key = format!("{}{}", self.default, thread_id);
608
609        // 获取连接并克隆(事务需要长期持有)
610        let mut guard = match self.client.get_guard() {
611            Ok(g) => g,
612            Err(e) => {
613                error!("获取事务连接失败: {e}");
614                // 清理已插入的 TRANS 记录
615                TRANS
616                    .lock()
617                    .unwrap_or_else(|e| e.into_inner())
618                    .remove(&*thread_id);
619                return false;
620            }
621        };
622
623        let conn = guard.conn().clone();
624        drop(guard);
625
626        TR.lock()
627            .unwrap_or_else(|e| e.into_inner())
628            .insert(key.clone(), Arc::new(Mutex::new(conn)));
629
630        let sql = "START TRANSACTION;".to_string();
631        let (state, _) = self.execute(sql.as_str());
632        match state {
633            true => {
634                let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
635                let (state, _) = self.execute(sql.as_str());
636                match state {
637                    true => state,
638                    false => {
639                        // 清理资源:先 TRANS 后 TR(保持一致的锁顺序)
640                        TRANS
641                            .lock()
642                            .unwrap_or_else(|e| e.into_inner())
643                            .remove(&*thread_id);
644                        TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
645                        state
646                    }
647                }
648            }
649            false => {
650                // 清理资源:先 TRANS 后 TR(保持一致的锁顺序)
651                TRANS
652                    .lock()
653                    .unwrap_or_else(|e| e.into_inner())
654                    .remove(&*thread_id);
655                TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
656                state
657            }
658        }
659    }
660
661    fn commit(&mut self) -> bool {
662        let thread_id = format!("{:?}", thread::current().id());
663        let key = format!("{}{}", self.default, thread_id);
664
665        // 修复:在单次锁持有期间完成嵌套计数检查和更新,保证原子性
666        let should_commit = {
667            let mut trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
668            if let Some(count) = trans_map.get_mut(&*thread_id) {
669                if *count > 1 {
670                    // 嵌套事务:减少计数(在同一把锁内完成)
671                    *count -= 1;
672                    false // 不需要执行 COMMIT
673                } else {
674                    // 最外层事务:标记为需要执行 COMMIT
675                    true
676                }
677            } else {
678                // 事务不存在,可能是已经被清理或从未创建
679                error!("提交事务失败: 线程ID: {thread_id} 事务不存在");
680                return false;
681            }
682        };
683
684        if !should_commit {
685            return true;
686        }
687
688        // 最外层事务:执行提交
689        let sql = "COMMIT".to_string();
690        let (state, data) = self.execute(sql.as_str());
691
692        // 修复:仅在成功时清理资源,失败时保留资源以便重试或回滚
693        if state {
694            // 清理资源:先 TRANS 后 TR(保持一致的锁顺序)
695            TRANS
696                .lock()
697                .unwrap_or_else(|e| e.into_inner())
698                .remove(&thread_id);
699            TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
700        } else {
701            let error_msg = data
702                .as_str()
703                .map(|s| s.to_string())
704                .unwrap_or_else(|| data.to_string());
705            error!(
706                "提交事务失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL: {} | 错误详情: {}",
707                thread_id,
708                self.default,
709                self.connection.database,
710                sql,
711                error_msg
712            );
713            // 注意:失败时不清理资源,保留 TRANS 和 TR 以便后续重试或回滚
714        }
715        state
716    }
717
718    fn rollback(&mut self) -> bool {
719        let thread_id = format!("{:?}", thread::current().id());
720        let key = format!("{}{}", self.default, thread_id);
721
722        // 修复:在单次锁持有期间完成嵌套计数检查和更新,保证原子性
723        let should_rollback = {
724            let mut trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
725            if let Some(count) = trans_map.get_mut(&*thread_id) {
726                if *count > 1 {
727                    // 嵌套事务:减少计数(在同一把锁内完成)
728                    *count -= 1;
729                    false // 不需要执行 ROLLBACK
730                } else {
731                    // 最外层事务:标记为需要执行 ROLLBACK
732                    true
733                }
734            } else {
735                // 事务不存在,可能是已经被清理或从未创建
736                error!("回滚事务失败: 线程ID: {thread_id} 事务不存在");
737                return false;
738            }
739        };
740
741        if !should_rollback {
742            return true;
743        }
744
745        // 最外层事务:执行回滚
746        let sql = "ROLLBACK".to_string();
747        let (state, data) = self.execute(sql.as_str());
748
749        // 修复:无论成功与否都清理资源(回滚失败时也需要清理,避免资源泄漏)
750        TRANS
751            .lock()
752            .unwrap_or_else(|e| e.into_inner())
753            .remove(&thread_id);
754        TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
755
756        if !state {
757            let error_msg = data
758                .as_str()
759                .map(|s| s.to_string())
760                .unwrap_or_else(|| data.to_string());
761            error!(
762                "回滚事务失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL: {} | 错误详情: {}",
763                thread_id,
764                self.default,
765                self.connection.database,
766                sql,
767                error_msg
768            );
769        }
770        state
771    }
772
773    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
774        let (state, data) = self.query(sql);
775        match state {
776            true => Ok(data),
777            false => Err(data.to_string()),
778        }
779    }
780
781    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
782        let (state, data) = self.execute(sql);
783        match state {
784            true => Ok(data),
785            false => Err(data.to_string()),
786        }
787    }
788
789    fn table_create(&mut self, options: TableOptions) -> JsonValue {
790        println!("!!! TABLE_CREATE FUNCTION CALLED with table: {}", options.table_name);
791        
792        // 清除缓存
793        let cache_key = format!("{}{}", self.default, options.table_name);
794        if TABLE_FIELDS.lock().unwrap().get(&cache_key).is_some() {
795            TABLE_FIELDS.lock().unwrap().remove(&cache_key);
796        }
797
798        let mut sql = String::new();
799        let mut unique = String::new();
800        let mut index = String::new();
801
802        // 处理唯一索引
803        let mut unique_fields = String::new();
804        for item in options.table_unique.iter() {
805            if unique_fields.is_empty() {
806                unique_fields = format!("\"{}\"", item);
807            } else {
808                unique_fields = format!("{}, \"{}\"", unique_fields, item);
809            }
810        }
811        if !unique_fields.is_empty() {
812            let mut unique_name = String::new();
813            for item in options.table_unique.iter() {
814                if unique_name.is_empty() {
815                    unique_name = format!("{}_unique_{}", options.table_name, item);
816                } else {
817                    unique_name = format!("{}_{}", unique_name, item);
818                }
819            }
820            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
821            unique_name = generate_index_name(&format!("unique_{}", md5));
822            unique = format!("CONSTRAINT {} UNIQUE ({})", unique_name, unique_fields);
823        }
824
825        // 处理普通索引
826        for row in options.table_index.iter() {
827            let mut index_fields = String::new();
828            let mut index_name = String::new();
829            for item in row {
830                if index_fields.is_empty() {
831                    index_fields = format!("\"{}\"", item);
832                    index_name = format!("{}_index_{}", options.table_name, item);
833                } else {
834                    index_fields = format!("{}, \"{}\"", index_fields, item);
835                    index_name = format!("{}_{}", index_name, item);
836                }
837            }
838            let md5 = br_crypto::md5::encrypt_hex(index_name.as_bytes());
839            index_name = generate_index_name(&format!("index_{}", md5));
840            if index.is_empty() {
841                index = format!(
842                    "CREATE INDEX {} ON {} ({})",
843                    index_name, options.table_name, index_fields
844                );
845            } else {
846                index = format!(
847                    "{}\r\nCREATE INDEX {} ON {} ({})",
848                    index, index_name, options.table_name, index_fields
849                );
850            }
851        }
852
853        // 生成字段定义
854        for (name, field) in options.table_fields.entries() {
855            let row = br_fields::field("pgsql", name, field.clone());
856            sql = format!("{sql} {row},\r\n");
857        }
858
859        // 添加主键
860        if !unique.is_empty() {
861            sql = sql.trim_end_matches(",\r\n").to_string();
862            sql = format!("{sql},\r\n{unique}");
863        }
864        sql = if sql.trim_end().ends_with(",") {
865            format!("{}\r\nPRIMARY KEY(\"{}\")", sql, options.table_key)
866        } else {
867            format!("{},\r\nPRIMARY KEY(\"{}\")", sql, options.table_key)
868        };
869
870        // 生成完整的 CREATE TABLE 语句(修复版:分割执行)
871        let create_sql = format!(
872            "CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n)",
873            options.table_name, sql
874        );
875
876        if self.params.sql {
877            // 如果只是查看SQL,返回分割后的语句列表
878            let mut statements = vec![create_sql];
879            if !index.is_empty() {
880                let index_statements: Vec<&str> = index.split("\r\n").filter(|s| !s.trim().is_empty()).collect();
881                for index_sql in index_statements {
882                    let final_sql = if index_sql.trim().ends_with(';') {
883                        index_sql.trim().to_string()
884                    } else {
885                        format!("{};", index_sql.trim())
886                    };
887                    statements.push(final_sql);
888                }
889            }
890            let full_sql = statements.join("\r\n");
891            println!("DEBUG: Returning SQL in fetch_sql mode: {}", full_sql);
892            return JsonValue::from(full_sql);
893        }
894
895        // 强制调试输出
896        println!("=== TABLE CREATE DEBUG START ===");
897        println!("DEBUG: params.sql = {}, table_name = {}", self.params.sql, options.table_name);
898        println!("CREATE TABLE SQL: {}", create_sql);
899        let (state, data) = self.execute(&create_sql);
900        println!("CREATE TABLE result: state={}, data={}", state, data);
901        if !state {
902            error!("创建表失败: {} - {}", options.table_name, data);
903            return JsonValue::from(false);
904        }
905
906        // 2. 如果有索引语句,分割执行
907        if !index.is_empty() {
908            println!("Index statements: {}", index);
909            let index_statements: Vec<&str> = index.split("\r\n").filter(|s| !s.trim().is_empty()).collect();
910            for (i, index_sql) in index_statements.iter().enumerate() {
911                let final_sql = if index_sql.trim().ends_with(';') {
912                    index_sql.trim().to_string()
913                } else {
914                    format!("{};", index_sql.trim())
915                };
916                println!("Index SQL {}: {}", i+1, final_sql);
917                let (idx_state, idx_data) = self.execute(&final_sql);
918                println!("Index result {}: state={}, data={}", i+1, idx_state, idx_data);
919                if !idx_state {
920                    error!("创建索引失败: {} - {}", options.table_name, idx_data);
921                    return JsonValue::from(false);
922                }
923            }
924        }
925
926        // 所有操作都成功完成
927        JsonValue::from(true)
928    }
929
930    fn table_update(&mut self, options: TableOptions) -> JsonValue {
931        // 检查表是否存在
932        if !self.table_is_exist(&options.table_name) {
933            // 表不存在,尝试创建表
934            warn!("表 {} 不存在,尝试创建表", options.table_name);
935            return self.table_create(options);
936        }
937
938        // 清除缓存
939        let cache_key = format!("{}{}", self.default, options.table_name);
940        if TABLE_FIELDS.lock().unwrap().get(&cache_key).is_some() {
941            TABLE_FIELDS.lock().unwrap().remove(&cache_key);
942        }
943
944        let mut sql = vec![];
945        let fields_list = self.table_info(&options.table_name);
946        let mut put = vec![];
947        let mut add = vec![];
948        let mut del = vec![];
949
950        // 检查需要删除的字段(在新配置中为空)
951        for (key, _) in fields_list.entries() {
952            if options.table_fields[key].is_empty() {
953                del.push(key);
954            }
955        }
956
957        // 检查需要添加或修改的字段
958        for (name, field) in options.table_fields.entries() {
959            println!("DEBUG: Processing field '{}', exists in DB: {}", name, !fields_list[name].is_empty());
960            let keys: Vec<_> = fields_list.entries().map(|(k, _)| k).collect();
961            println!("DEBUG: fields_list keys: {:?}", keys);
962            println!("DEBUG: Looking for key '{}' in fields_list: {}", name, keys.contains(&name));
963            if !fields_list[name].is_empty() {
964                // 字段已存在,检查是否需要更新
965                let old_field = &fields_list[name];
966                println!("DEBUG: Field '{}' exists, old_field: {:?}", name, old_field);
967
968                // 获取新字段的标题(从 field JSON 对象中)
969                let new_title = field["title"].as_str().unwrap_or("");
970
971                // 获取旧字段的注释,并从中提取标题
972                // 注释格式通常是 'title|mode|...',标题是第一部分
973                let old_comment = old_field["comment"].as_str().unwrap_or("");
974                let old_title = if !old_comment.is_empty() {
975                    old_comment.split('|').next().unwrap_or("")
976                } else {
977                    ""
978                };
979
980                // 比较字段名和标题的合并字符串:"{字段名}|{标题}"
981                let new_field_title = format!("{}|{}", name, new_title);
982                let old_field_title = format!("{}|{}", name, old_title);
983
984                // 如果有差异,需要更新
985                if new_field_title != old_field_title {
986                    put.push(name);
987                    continue;
988                }
989
990                // 如果字段名和标题都相同,跳过更新
991                continue;
992            } else {
993                // 字段不存在,需要添加
994                add.push(name);
995            }
996        }
997
998        // 如果没有需要添加、删除或修改的字段,直接返回 -1 表示没有变化
999        if add.is_empty() && del.is_empty() && put.is_empty() {
1000            info!(
1001                "数据库更新情况: {} 成功 更新前检查字段是否有变化,没有变化的不需要更新",
1002                options.table_name
1003            );
1004            return JsonValue::from(-1);
1005        }
1006
1007        // 生成 ALTER TABLE 语句 - 改进列存在检查
1008        for name in add.iter() {
1009            let name_str = name.to_string();
1010            
1011            // 更安全的列存在检查 - 使用更精确的查询
1012            let column_check_sql = format!(
1013                "SELECT COUNT(*) as count FROM information_schema.columns WHERE table_schema = 'public' AND table_name = '{}' AND column_name = '{}'",
1014                options.table_name, name_str
1015            );
1016            println!("DEBUG: Checking column existence with SQL: {}", column_check_sql);
1017            let (check_state, check_result) = self.query(&column_check_sql);
1018            
1019            if check_state {
1020                let count = check_result[0]["count"].as_i64().unwrap_or(0);
1021                if count > 0 {
1022                    println!("DEBUG: Column '{}' already exists in table '{}', skipping ADD COLUMN", name_str, options.table_name);
1023                    continue;
1024                }
1025            } else {
1026                println!("DEBUG: Column check failed for '{}', assuming it doesn't exist", name_str);
1027            }
1028            
1029            let row = br_fields::field("pgsql", &name_str, options.table_fields[name_str.as_str()].clone());
1030            let alter_sql = format!(
1031                "ALTER TABLE {} ADD COLUMN {}",
1032                options.table_name, row
1033            );
1034            println!("DEBUG: Adding column with SQL: {}", alter_sql);
1035            sql.push(alter_sql);
1036        }
1037
1038        for name in del.iter() {
1039            sql.push(format!(
1040                "ALTER TABLE {} DROP COLUMN \"{}\"",
1041                options.table_name, name
1042            ));
1043        }
1044
1045        for name in put.iter() {
1046            let name_str = name.to_string();
1047            let row = br_fields::field("pgsql", &name_str, options.table_fields[name_str.as_str()].clone());
1048            println!("DEBUG: br_fields::field returned: {}", row);
1049
1050            // 解析字段定义,提取类型和注释
1051            let comment_parts: Vec<&str> = row.split(" comment ").collect();
1052            let field_type_part = if comment_parts.len() > 1 {
1053                comment_parts[0].trim()
1054            } else {
1055                row.trim()
1056            };
1057
1058            // PostgreSQL ALTER COLUMN 需要分别处理类型和注释
1059            // 提取类型部分(去掉字段名,只保留类型定义)
1060            let type_def = if let Some(space_pos) = field_type_part.find(' ') {
1061                &field_type_part[space_pos + 1..]
1062            } else {
1063                field_type_part
1064            };
1065
1066            // 更新字段类型
1067            sql.push(format!(
1068                "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {}",
1069                options.table_name, name_str, type_def
1070            ));
1071
1072            // 更新注释(如果存在)
1073            if comment_parts.len() > 1 {
1074                let comment = comment_parts[1]
1075                    .trim_start_matches("'")
1076                    .trim_end_matches("'");
1077                sql.push(format!(
1078                    "COMMENT ON COLUMN {}.{} IS '{}'",
1079                    options.table_name, name_str, comment
1080                ));
1081            }
1082
1083            // 更新默认值和可空性(需要从原始字段定义中解析)
1084            let old_field = self.table_info(&options.table_name);
1085            if old_field[name_str.as_str()].is_object() {
1086                let field_info = &old_field[name_str.as_str()];
1087                // 检查是否需要更新默认值
1088                let old_default = field_info["column_default"].as_str().unwrap_or("");
1089                
1090                // 如果新的字段定义包含 "not null" 但旧的没有,或者包含不同的默认值
1091                if row.contains("not null") && !old_default.contains("not null") {
1092                    sql.push(format!(
1093                        "ALTER TABLE {} ALTER COLUMN \"{}\" SET NOT NULL",
1094                        options.table_name, name_str
1095                    ));
1096                }
1097                // 如果包含默认值,处理默认值更新
1098                if let Some(default_start) = row.find("default '") {
1099                    if let Some(default_end) = row[default_start + 9..].find('\'') {
1100                        let new_default = &row[default_start + 9..default_start + 9 + default_end];
1101                        sql.push(format!(
1102                            "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {}",
1103                            options.table_name, name_str, new_default
1104                        ));
1105                    }
1106                }
1107            }
1108        }
1109
1110        // 处理唯一索引
1111        let mut unique_fields = String::new();
1112        let mut unique_name = String::new();
1113        for item in options.table_unique.iter() {
1114            if unique_fields.is_empty() {
1115                unique_fields = format!("\"{}\"", item);
1116                unique_name = format!("{}_unique_{}", options.table_name, item);
1117            } else {
1118                unique_fields = format!("{}, \"{}\"", unique_fields, item);
1119                unique_name = format!("{}_{}", unique_name, item);
1120            }
1121        }
1122
1123        if !unique_name.is_empty() {
1124            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
1125            unique_name = generate_index_name(&format!("unique_{}", md5));
1126
1127            // 查询现有唯一索引
1128            let (_, index_list) = self.query(format!(
1129                "SELECT indexname FROM pg_indexes WHERE tablename = '{}' AND indexdef LIKE '%UNIQUE%'",
1130                options.table_name
1131            ).as_str());
1132
1133            let mut unique_new = vec![];
1134            for item in index_list.members() {
1135                if let Some(index_name) = item["indexname"].as_str() {
1136                    unique_new.push(index_name.to_string());
1137                }
1138            }
1139
1140            // 删除不匹配的唯一索引
1141            for item in &unique_new {
1142                if unique_name != *item {
1143                    sql.push(format!("DROP INDEX IF EXISTS {}", item));
1144                }
1145            }
1146
1147            // 创建新的唯一索引(如果不存在)
1148            if !unique_new.contains(&unique_name) {
1149                sql.push(format!(
1150                    "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({})",
1151                    unique_name, options.table_name, unique_fields
1152                ));
1153            }
1154        }
1155
1156        // 处理普通索引
1157        let mut index_list = vec![];
1158        for row in options.table_index.iter() {
1159            let mut index_fields = String::new();
1160            let mut index_name = String::new();
1161            for item in row {
1162                if index_fields.is_empty() {
1163                    index_fields = format!("\"{}\"", item);
1164                    index_name = format!("{}_index_{}", options.table_name, item);
1165                } else {
1166                    index_fields = format!("{}, \"{}\"", index_fields, item);
1167                    index_name = format!("{}_{}", index_name, item);
1168                }
1169            }
1170            let md5 = br_crypto::md5::encrypt_hex(index_name.as_bytes());
1171            index_name = generate_index_name(&format!("index_{}", md5));
1172            index_list.push(index_name.clone());
1173
1174            // 查询现有索引
1175            let (_, existing_indexes) = self.query(
1176                format!(
1177                    "SELECT indexname FROM pg_indexes WHERE tablename = '{}'",
1178                    options.table_name
1179                )
1180                .as_str(),
1181            );
1182
1183            let mut existing_index_names = vec![];
1184            for item in existing_indexes.members() {
1185                if let Some(index_name) = item["indexname"].as_str() {
1186                    existing_index_names.push(index_name.to_string());
1187                }
1188            }
1189
1190            if !existing_index_names.contains(&index_name) {
1191                sql.push(format!(
1192                    "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
1193                    index_name, options.table_name, index_fields
1194                ));
1195            }
1196        }
1197
1198        // 删除不再需要的索引
1199        let (_, all_indexes) = self.query(
1200            format!(
1201                "SELECT indexname FROM pg_indexes WHERE tablename = '{}'",
1202                options.table_name
1203            )
1204            .as_str(),
1205        );
1206
1207        for item in all_indexes.members() {
1208            if let Some(index_name) = item["indexname"].as_str() {
1209                // 跳过主键索引和唯一索引(它们由上面的逻辑处理)
1210                if !index_name.contains("_pkey") && !index_list.contains(&index_name.to_string()) {
1211                    // 检查是否是表相关的索引
1212                    if index_name.starts_with(&format!("{}_", options.table_name)) {
1213                        sql.push(format!("DROP INDEX IF EXISTS {}", index_name));
1214                    }
1215                }
1216            }
1217        }
1218
1219        if self.params.sql {
1220            return JsonValue::from(sql.join("\r\n"));
1221        }
1222
1223        // 如果没有需要执行的 SQL,返回 -1 表示没有变化
1224        if sql.is_empty() {
1225            info!(
1226                "数据库更新情况: {} 成功 更新前检查字段是否有变化,没有变化的不需要更新",
1227                options.table_name
1228            );
1229            return JsonValue::from(-1);
1230        }
1231
1232        // 执行 SQL 语句
1233        for item in sql.iter() {
1234            let (state, res) = self.execute(item.as_str());
1235            match state {
1236                true => {}
1237                false => {
1238                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
1239                    return JsonValue::from(0);
1240                }
1241            }
1242        }
1243
1244        JsonValue::from(1)
1245    }
1246
1247    fn table_info(&mut self, table: &str) -> JsonValue {
1248        let cache_key = format!("{}{}", self.default, table);
1249        {
1250            let fields = TABLE_FIELDS.lock().unwrap();
1251            if let Some(cached) = fields.get(&cache_key) {
1252                return cached.clone();
1253            }
1254        }
1255
1256        // 查询 PostgreSQL 表结构信息
1257        let sql = format!(
1258            "SELECT 
1259                COL.COLUMN_NAME,
1260                COL.DATA_TYPE,
1261                COL.UDT_NAME,
1262                COL.CHARACTER_MAXIMUM_LENGTH,
1263                COL.NUMERIC_PRECISION,
1264                COL.NUMERIC_SCALE,
1265                COL.COLUMN_DEFAULT,
1266                COL.IS_NULLABLE,
1267                COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT
1268            FROM INFORMATION_SCHEMA.COLUMNS COL
1269            LEFT JOIN pg_catalog.pg_description DESCRIPTION
1270                ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
1271                AND DESCRIPTION.objoid = (
1272                    SELECT oid FROM pg_catalog.pg_class 
1273                    WHERE relname = COL.TABLE_NAME 
1274                    AND relnamespace = (SELECT oid FROM pg_catalog.pg_namespace WHERE nspname = 'public')
1275                    LIMIT 1
1276                )
1277            WHERE COL.TABLE_SCHEMA = 'public' 
1278                AND COL.TABLE_NAME = '{}'",
1279            table
1280        );
1281
1282        println!("DEBUG: Table info SQL: {}", sql);
1283        let (state, data) = self.query(sql.as_str());
1284        println!("DEBUG: Table info query result: state={}, data={}", state, data);
1285        let mut list = object! {};
1286
1287        if state {
1288            println!("DEBUG: Processing {} data items", data.members().len());
1289            for (i, item) in data.members().enumerate() {
1290                println!("DEBUG: Item {}: {}", i, item);
1291                if let Some(field_name) = item["column_name"].as_str() {
1292                    println!("DEBUG: Extracted field_name: '{}'", field_name);
1293                    let mut row = object! {};
1294                    row["field"] = JsonValue::from(field_name);
1295                    row["type"] = item["data_type"].clone();
1296                    row["udt_name"] = item["udt_name"].clone();
1297                    row["comment"] = item["comment"].clone();
1298                    row["is_nullable"] = item["is_nullable"].clone();
1299                    row["column_default"] = item["column_default"].clone();
1300                    row["character_maximum_length"] = item["character_maximum_length"].clone();
1301                    row["numeric_precision"] = item["numeric_precision"].clone();
1302                    row["numeric_scale"] = item["numeric_scale"].clone();
1303                    list[field_name] = row;
1304                    println!("DEBUG: Stored field '{}' in list", field_name);
1305                } else {
1306                    println!("DEBUG: Could not extract COLUMN_NAME from item: {}", item);
1307                }
1308            }
1309            let list_clone = list.clone();
1310            println!("DEBUG: Storing {} fields in cache for key: {}", list_clone.len(), cache_key);
1311            TABLE_FIELDS.lock().unwrap().insert(cache_key, list_clone);
1312        }
1313        list
1314    }
1315
1316    fn table_is_exist(&mut self, name: &str) -> bool {
1317        let sql = format!(
1318            "SELECT COUNT(*) as count FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{}'",
1319            name
1320        );
1321        let (state, data) = self.query(sql.as_str());
1322        match state {
1323            true => {
1324                if !data.is_empty() {
1325                    if let Some(count) = data[0]["count"].as_i64() {
1326                        return count > 0;
1327                    }
1328                }
1329                false
1330            }
1331            false => false,
1332        }
1333    }
1334
1335    fn table(&mut self, name: &str) -> &mut Pgsql {
1336        self.params.table = name.to_string();
1337        self
1338    }
1339
1340    fn change_table(&mut self, name: &str) -> &mut Self {
1341        self.params.table = name.to_string();
1342        self
1343    }
1344
1345    fn autoinc(&mut self) -> &mut Self {
1346        self
1347    }
1348
1349    fn fetch_sql(&mut self) -> &mut Self {
1350        self.params.sql = true;
1351        self
1352    }
1353
1354    fn order(&mut self, _field: &str, _by: bool) -> &mut Self {
1355        self
1356    }
1357
1358    fn group(&mut self, _field: &str) -> &mut Self {
1359        self
1360    }
1361
1362    fn distinct(&mut self) -> &mut Self {
1363        self
1364    }
1365
1366    fn json(&mut self, _field: &str) -> &mut Self {
1367        self
1368    }
1369
1370    fn location(&mut self, _field: &str) -> &mut Self {
1371        self
1372    }
1373
1374    fn field(&mut self, _field: &str) -> &mut Self {
1375        self
1376    }
1377
1378    fn hidden(&mut self, _name: &str) -> &mut Self {
1379        self
1380    }
1381
1382    fn where_and(&mut self, _field: &str, _compare: &str, _value: JsonValue) -> &mut Self {
1383        self
1384    }
1385
1386    fn where_or(&mut self, _field: &str, _compare: &str, _value: JsonValue) -> &mut Self {
1387        self
1388    }
1389
1390    fn where_column(&mut self, _field_a: &str, _compare: &str, _field_b: &str) -> &mut Self {
1391        self
1392    }
1393
1394    fn update_column(&mut self, _field_a: &str, _compare: &str) -> &mut Self {
1395        self
1396    }
1397
1398    fn page(&mut self, _page: i32, _limit: i32) -> &mut Self {
1399        self
1400    }
1401
1402    fn column(&mut self, _field: &str) -> JsonValue {
1403        JsonValue::Null
1404    }
1405
1406    fn count(&mut self) -> JsonValue {
1407        JsonValue::from(0)
1408    }
1409
1410    fn max(&mut self, _field: &str) -> JsonValue {
1411        JsonValue::from(0)
1412    }
1413
1414    fn min(&mut self, _field: &str) -> JsonValue {
1415        JsonValue::from(0)
1416    }
1417
1418    fn sum(&mut self, _field: &str) -> JsonValue {
1419        JsonValue::from(0)
1420    }
1421
1422    fn avg(&mut self, _field: &str) -> JsonValue {
1423        JsonValue::from(0)
1424    }
1425
1426    fn select(&mut self) -> JsonValue {
1427        array![]
1428    }
1429
1430    fn find(&mut self) -> JsonValue {
1431        object! {}
1432    }
1433
1434    fn value(&mut self, _field: &str) -> JsonValue {
1435        JsonValue::Null
1436    }
1437
1438    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1439        let thread_id = format!("{:?}", thread::current().id());
1440
1441        let fields_list = self.table_info(&self.params.table.clone());
1442
1443        let mut fields = vec![];
1444        let mut values = vec![];
1445
1446        // 处理自动递增 ID
1447        if !self.params.autoinc && data["id"].is_empty() {
1448            data["id"] =
1449                format!("{:X}", chrono::Local::now().timestamp_nanos_opt().unwrap()).into();
1450        }
1451
1452        for (field, value) in data.entries() {
1453            fields.push(format!("\"{}\"", field));
1454
1455            if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1456                if value.is_empty() {
1457                    values.push("NULL".to_string());
1458                    continue;
1459                }
1460                let comment = fields_list[field]["comment"].to_string();
1461                let srid = comment
1462                    .split("|")
1463                    .collect::<Vec<&str>>()
1464                    .last()
1465                    .unwrap_or(&"4326")
1466                    .to_string();
1467                let location = value.to_string().replace(",", " ");
1468                values.push(format!("ST_GeomFromText('POINT({})',{})", location, srid));
1469                continue;
1470            }
1471
1472            if value.is_string() || value.is_array() || value.is_object() {
1473                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1474                continue;
1475            } else if value.is_number() || value.is_boolean() || value.is_null() {
1476                values.push(format!("{}", value));
1477                continue;
1478            } else {
1479                values.push(format!("'{}'", value));
1480                continue;
1481            }
1482        }
1483
1484        let fields = fields.join(",");
1485        let values = values.join(",");
1486
1487        let sql = format!(
1488            "INSERT INTO {} ({}) VALUES ({});",
1489            self.params.table, fields, values
1490        );
1491
1492        if self.params.sql {
1493            return JsonValue::from(sql);
1494        }
1495
1496        let (state, ids) = self.execute(sql.as_str());
1497        match state {
1498            true => match self.params.autoinc {
1499                true => ids.clone(),
1500                false => data["id"].clone(),
1501            },
1502            false => {
1503                let error_msg = ids
1504                    .as_str()
1505                    .map(|s| s.to_string())
1506                    .unwrap_or_else(|| ids.to_string());
1507                error!(
1508                    "插入记录失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | 表名: {} | SQL语句: [{}] | 错误详情: {}",
1509                    thread_id,
1510                    self.default,
1511                    self.connection.database,
1512                    self.params.table,
1513                    sql,
1514                    error_msg
1515                );
1516                JsonValue::from("")
1517            }
1518        }
1519    }
1520
1521    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1522        let thread_id = format!("{:?}", thread::current().id());
1523        let list = array![];
1524
1525        if !data.is_array() {
1526            return list;
1527        }
1528
1529        let fields_list = self.table_info(&self.params.table.clone());
1530        let mut fields = vec![];
1531
1532        // 获取所有记录的字段列表的并集
1533        use std::collections::HashSet;
1534        let mut field_set = HashSet::new();
1535        for record in data.members() {
1536            for (field_name, _) in record.entries() {
1537                field_set.insert(field_name.to_string());
1538            }
1539        }
1540
1541        // 转换为排序的字段列表(确保顺序一致性)
1542        let mut field_vec: Vec<String> = field_set.into_iter().collect();
1543        field_vec.sort();
1544        for field_name in &field_vec {
1545            fields.push(format!("\"{}\"", field_name));
1546        }
1547
1548        // 处理自动递增 ID
1549        if !self.params.autoinc {
1550            for i in 0..data.len() {
1551                if data[i]["id"].is_empty() {
1552                    data[i]["id"] = format!(
1553                        "{:X}",
1554                        chrono::Local::now().timestamp_nanos_opt().unwrap() + i as i64
1555                    )
1556                    .into();
1557                }
1558            }
1559        }
1560
1561        let mut values_list = vec![];
1562        for record in data.members() {
1563            let mut values = vec![];
1564
1565            // 按照字段列表的顺序构建值
1566            for field_name in &field_vec {
1567                let value = record[field_name].clone();
1568
1569                if self.params.location.has_key(field_name)
1570                    && !self.params.location[field_name].is_empty()
1571                {
1572                    if value.is_empty() {
1573                        values.push("NULL".to_string());
1574                        continue;
1575                    }
1576                    let comment = fields_list[field_name]["comment"].to_string();
1577                    let srid = comment
1578                        .split("|")
1579                        .collect::<Vec<&str>>()
1580                        .last()
1581                        .unwrap_or(&"4326")
1582                        .to_string();
1583                    let location = value.to_string().replace(",", " ");
1584                    values.push(format!("ST_GeomFromText('POINT({})',{})", location, srid));
1585                    continue;
1586                }
1587
1588                if value.is_string() || value.is_array() || value.is_object() {
1589                    values.push(format!("'{}'", value.to_string().replace("'", "''")));
1590                } else if value.is_number() || value.is_boolean() || value.is_null() {
1591                    values.push(format!("{}", value));
1592                } else {
1593                    values.push(format!("'{}'", value.to_string().replace("'", "''")));
1594                }
1595            }
1596
1597            values_list.push(format!("({})", values.join(",")));
1598        }
1599
1600        let fields = fields.join(",");
1601        let values = values_list.join(",");
1602
1603        let sql = format!(
1604            "INSERT INTO {} ({}) VALUES {};",
1605            self.params.table, fields, values
1606        );
1607
1608        if self.params.sql {
1609            return JsonValue::from(sql);
1610        }
1611
1612        // 调试信息:确保表名正确
1613        if self.params.table.is_empty() {
1614            error!("批量插入失败:表名为空 - 线程ID: {}", thread_id);
1615            return JsonValue::from("");
1616        }
1617
1618        let (state, result) = self.execute(sql.as_str());
1619        match state {
1620            true => {
1621                // 对于批量插入,如果没有自动递增ID,返回插入的记录数
1622                if self.params.autoinc {
1623                    result.clone()
1624                } else {
1625                    // 返回插入的记录数量
1626                    JsonValue::from(data.len() as i64)
1627                }
1628            }
1629            false => {
1630                let error_msg = result
1631                    .as_str()
1632                    .map(|s| s.to_string())
1633                    .unwrap_or_else(|| result.to_string());
1634                error!(
1635                    "批量插入失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | 表名: {} | SQL语句: [{}] | 错误详情: {}",
1636                    thread_id,
1637                    self.default,
1638                    self.connection.database,
1639                    self.params.table,
1640                    sql,
1641                    error_msg
1642                );
1643                JsonValue::from("")
1644            }
1645        }
1646    }
1647
1648    fn update(&mut self, _data: JsonValue) -> JsonValue {
1649        JsonValue::from(0)
1650    }
1651
1652    fn update_all(&mut self, _data: JsonValue) -> JsonValue {
1653        JsonValue::from(0)
1654    }
1655
1656    fn delete(&mut self) -> JsonValue {
1657        JsonValue::from(0)
1658    }
1659
1660    fn inc(&mut self, _field: &str, _num: f64) -> &mut Self {
1661        self
1662    }
1663
1664    fn dec(&mut self, _field: &str, _num: f64) -> &mut Self {
1665        self
1666    }
1667
1668    fn buildsql(&mut self) -> String {
1669        String::new()
1670    }
1671
1672    fn join_fields(&mut self, _fields: Vec<&str>) -> &mut Self {
1673        self
1674    }
1675
1676    fn join(
1677        &mut self,
1678        _main_table: &str,
1679        _main_fields: &str,
1680        _right_table: &str,
1681        _right_fields: &str,
1682    ) -> &mut Self {
1683        self
1684    }
1685
1686    fn join_inner(&mut self, _table: &str, _main_fields: &str, _second_fields: &str) -> &mut Self {
1687        self
1688    }
1689}