1use crate::config::Connection;
2use crate::types::{DbMode, DbResult, Mode, Params, TableOptions};
3use br_pgsql::connect::Connect;
4use br_pgsql::pools::Pools;
5use br_pgsql::{PoolConstraints, PoolOpts};
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info, warn};
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::Mutex;
12use std::thread;
13use std::time::Duration;
14
15const PG_MAX_IDENTIFIER_LENGTH: usize = 63;
17
18fn generate_index_name(base_name: &str) -> String {
21 if base_name.len() <= PG_MAX_IDENTIFIER_LENGTH {
22 base_name.to_string()
23 } else {
24 let md5 = br_crypto::md5::encrypt_hex(base_name.as_bytes());
26 if let Some(underscore_pos) = base_name.find('_') {
29 let max_prefix_len = PG_MAX_IDENTIFIER_LENGTH - 6 - 16;
31 let prefix = &base_name[..underscore_pos.min(max_prefix_len)];
32 format!("{}_hash_{}", prefix, &md5[..16])
33 } else {
34 format!("idx_{}", &md5[..(PG_MAX_IDENTIFIER_LENGTH - 4)])
36 }
37 }
38}
39
40lazy_static! {
41 static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> =
42 Arc::new(Mutex::new(HashMap::new()));
43 static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
44 static ref TABLE_FIELDS: Arc<Mutex<HashMap<String, JsonValue>>> =
45 Arc::new(Mutex::new(HashMap::new()));
46 static ref POOL_STATS: Arc<Mutex<PoolStats>> = Arc::new(Mutex::new(PoolStats::new()));
48}
49
50#[derive(Debug, Clone)]
52pub struct PoolHealthStatus {
53 pub is_healthy: bool,
55 pub total_connections: usize,
57 pub active_connections: usize,
59 pub utilization: f64,
61 pub idle_connections: usize,
63}
64
65#[derive(Debug, Clone)]
67struct PoolStats {
68 total_connections: usize,
70 active_connections: usize,
72 idle_connections: usize,
74 connections_created: u64,
76 connection_errors: u64,
78 last_check: std::time::Instant,
80}
81
82impl PoolStats {
83 fn new() -> Self {
84 Self {
85 total_connections: 0,
86 active_connections: 0,
87 idle_connections: 0,
88 connections_created: 0,
89 connection_errors: 0,
90 last_check: std::time::Instant::now(),
91 }
92 }
93
94 fn record_connection_created(&mut self) {
96 self.connections_created += 1;
97 self.total_connections += 1;
98 self.last_check = std::time::Instant::now();
99 }
100
101 fn record_connection_error(&mut self) {
103 self.connection_errors += 1;
104 self.last_check = std::time::Instant::now();
105 }
106
107 fn increment_active(&mut self) {
109 self.active_connections += 1;
110 self.last_check = std::time::Instant::now();
111 }
112
113 fn decrement_active(&mut self) {
115 if self.active_connections > 0 {
116 self.active_connections -= 1;
117 }
118 self.last_check = std::time::Instant::now();
119 }
120
121 fn update_idle_count(&mut self, idle_count: usize) {
123 self.idle_connections = idle_count;
124 self.last_check = std::time::Instant::now();
125 }
126
127 fn utilization_rate(&self) -> f64 {
129 if self.total_connections == 0 {
130 0.0
131 } else {
132 self.active_connections as f64 / self.total_connections as f64
133 }
134 }
135}
136
137struct ConnectionGuard;
139
140impl ConnectionGuard {
141 fn new() -> Self {
142 Self
143 }
144}
145
146impl Drop for ConnectionGuard {
147 fn drop(&mut self) {
148 let mut stats = POOL_STATS.lock().unwrap();
150 stats.decrement_active();
151 }
152}
153
154#[derive(Clone)]
155pub struct Pgsql {
156 pub connection: Connection,
158 pub default: String,
160 pub params: Params,
161 pub client: Pools,
162}
163
164impl Pgsql {
165 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
168 let port = connection
169 .hostport
170 .parse::<i32>()
171 .map_err(|e| format!("端口号解析失败: {} ({})", connection.hostport, e))?;
172
173 let cp_connection = connection.clone();
174 let config = object! {
175 debug: cp_connection.debug,
176 username: cp_connection.username,
177 userpass: cp_connection.userpass,
178 database: cp_connection.database,
179 hostname: cp_connection.hostname,
180 hostport: port,
181 charset: cp_connection.charset.str(),
182 };
183 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
184
185 let cpu_cores = num_cpus::get();
188 let optimal_pool_size = (cpu_cores * 2) + 1;
189
190 let calculated_max = config["pool_max"]
193 .as_u32()
194 .filter(|&p| p > 0 && p <= 1000)
195 .unwrap_or(std::cmp::min(optimal_pool_size as u32, 50))
196 as usize; info!(
199 "PostgreSQL 连接池配置 - CPU核心数: {}, 计算最优值: {}, 实际使用: {}",
200 cpu_cores, optimal_pool_size, calculated_max
201 );
202
203 let max_pools = calculated_max;
204
205 let constraints =
206 PoolConstraints::new(0, max_pools).map_err(|e| format!("连接池约束配置失败: {}", e))?;
207
208 let pool_opts = PoolOpts::default()
210 .with_constraints(constraints)
211 .with_reset_connection(true)
212 .with_connect_timeout(Duration::from_secs(5))
213 .with_read_timeout(Duration::from_secs(15))
214 .with_write_timeout(Duration::from_secs(20))
215 .with_tcp_keepalive(Duration::from_secs(5));
216
217 let pools = pgsql.pools_with_opts(pool_opts)?;
219
220 {
222 let mut stats = POOL_STATS.lock().unwrap();
223 stats.record_connection_created();
224 }
225
226 Ok(Self {
227 connection,
228 default: default.clone(),
229 params: Params::default("pgsql"),
230 client: pools,
231 })
232 }
233
234 pub async fn health_check(&mut self) -> DbResult<PoolHealthStatus> {
236 let _stats = POOL_STATS.lock().unwrap().clone();
237
238 let is_healthy = {
239 match self.client.get_guard() {
240 Ok(mut guard) => {
241 match guard.conn().query("SELECT 1") {
242 Ok(_) => {
243 drop(guard); true
246 },
247 Err(e) => {
248 error!("连接池健康检查查询失败: {}", e);
249 false
250 }
251 }
252 },
253 Err(e) => {
254 error!("无法从连接池获取连接进行健康检查: {}", e);
255 false
256 }
257 }
258 };
259
260 let mut current_stats = POOL_STATS.lock().unwrap();
261
262 if current_stats.total_connections == 0 {
264 let cpu_cores = num_cpus::get();
266 let optimal_pool_size = (cpu_cores * 2) + 1;
267 current_stats.total_connections = std::cmp::min(optimal_pool_size, 50);
268 }
269
270 current_stats.idle_connections = if current_stats.total_connections >= current_stats.active_connections {
272 current_stats.total_connections - current_stats.active_connections
273 } else {
274 0
275 };
276
277 Ok(PoolHealthStatus {
278 is_healthy,
279 total_connections: current_stats.total_connections,
280 active_connections: current_stats.active_connections,
281 utilization: current_stats.utilization_rate(),
282 idle_connections: current_stats.idle_connections,
283 })
284 }
285
286 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
287 let thread_id = format!("{:?}", thread::current().id());
288 let key = format!("{}{}", self.default, thread_id);
289
290 let _connection_guard = ConnectionGuard::new();
293
294 {
295 let mut stats = POOL_STATS.lock().unwrap();
296 stats.increment_active();
297
298 let utilization = stats.utilization_rate();
300 if utilization > 0.8 {
301 warn!(
302 "PostgreSQL 连接池使用率过高: {:.1}%, 活跃连接: {}, 总连接: {}",
303 utilization * 100.0,
304 stats.active_connections,
305 stats.total_connections
306 );
307 }
308 }
309
310 let db_opt = {
313 let trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
314 if trans_map.get(&*thread_id).is_some() {
315 let tr_map = TR.lock().unwrap_or_else(|e| e.into_inner());
317 tr_map.get(&key).cloned()
318 } else {
319 None
320 }
321 };
322
323 if let Some(db) = db_opt {
324 let mut t = db.lock().unwrap();
326 match t.query(sql) {
327 Ok(e) => {
328 (true, e.rows)
330 }
331 Err(e) => {
332 error!(
333 "事务查询失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
334 thread_id,
335 self.default,
336 self.connection.database,
337 sql,
338 e
339 );
340 (false, JsonValue::from(e.to_string()))
341 }
342 }
343 } else {
344 let mut guard = match self.client.get_guard() {
346 Ok(g) => g,
347 Err(e) => {
348 error!(
349 "获取数据库连接失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
350 thread_id,
351 self.default,
352 self.connection.database,
353 sql,
354 e
355 );
356 return (false, JsonValue::from(e.to_string()));
357 }
358 };
359
360 println!("DEBUG: About to execute query SQL: {}", sql);
361 let res = guard.conn().query(sql);
362 match res {
363 Ok(e) => {
364 println!("DEBUG: Query SQL executed successfully, returned {} rows", e.rows.len());
366 (true, e.rows)
367 }
368 Err(e) => {
369 println!("DEBUG: Query SQL failed with error: {}", e);
370 error!(
371 "非事务查询失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
372 thread_id,
373 self.default,
374 self.connection.database,
375 sql,
376 e
377 );
378 (false, JsonValue::from(e.to_string()))
379 }
380 }
381 }
383 }
384 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
385 let thread_id = format!("{:?}", thread::current().id());
386 let key = format!("{}{}", self.default, thread_id);
387
388 let db_opt = {
391 let trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
392 if trans_map.get(&*thread_id).is_some() {
393 let tr_map = TR.lock().unwrap_or_else(|e| e.into_inner());
395 tr_map.get(&key).cloned()
396 } else {
397 None
398 }
399 };
400
401 if let Some(db) = db_opt {
402 let mut t = db.lock().unwrap();
404 match t.execute(sql) {
405 Ok(e) => {
406 if sql.contains("INSERT") {
408 (true, e.rows)
409 } else {
410 (true, e.affect_count.into())
411 }
412 }
413 Err(e) => {
414 let operation_type = if sql.trim_start().to_uppercase().starts_with("INSERT") {
415 "插入"
416 } else if sql.trim_start().to_uppercase().starts_with("UPDATE") {
417 "更新"
418 } else if sql.trim_start().to_uppercase().starts_with("DELETE") {
419 "删除"
420 } else {
421 "执行"
422 };
423 error!(
424 "事务{}失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | 表名: {} | SQL语句: [{}] | 错误详情: {}",
425 operation_type,
426 thread_id,
427 self.default,
428 self.connection.database,
429 self.params.table,
430 sql,
431 e
432 );
433 (false, JsonValue::from(e.to_string()))
434 }
435 }
436 } else {
437 let mut guard = match self.client.get_guard() {
439 Ok(g) => g,
440 Err(e) => {
441 error!(
442 "获取数据库连接失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
443 thread_id,
444 self.default,
445 self.connection.database,
446 sql,
447 e
448 );
449 return (false, JsonValue::from(e.to_string()));
450 }
451 };
452
453 println!("DEBUG: About to execute SQL: {}", sql);
454 let res = guard.conn().execute(sql);
455 match res {
456 Ok(e) => {
457 println!("DEBUG: SQL executed successfully, affected rows: {}", e.affect_count);
459 if sql.contains("INSERT") {
460 (true, e.rows)
461 } else {
462 (true, e.affect_count.into())
463 }
464 }
465 Err(e) => {
466 println!("DEBUG: SQL failed with error: {}", e);
467 let operation_type = if sql.trim_start().to_uppercase().starts_with("INSERT") {
468 "插入"
469 } else if sql.trim_start().to_uppercase().starts_with("UPDATE") {
470 "更新"
471 } else if sql.trim_start().to_uppercase().starts_with("DELETE") {
472 "删除"
473 } else {
474 "执行"
475 };
476 error!(
477 "非事务{}失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | 表名: {} | SQL语句: [{}] | 错误详情: {}",
478 operation_type,
479 thread_id,
480 self.default,
481 self.connection.database,
482 self.params.table,
483 sql,
484 e
485 );
486 (false, JsonValue::from(e.to_string()))
487 }
488 }
489 }
491 }
492}
493
494impl DbMode for Pgsql {
495 fn database_tables(&mut self) -> JsonValue {
496 let sql = "SELECT table_name FROM information_schema.tables
497 WHERE table_schema = 'public' AND table_type = 'BASE TABLE'"
498 .to_string();
499 match self.sql(sql.as_str()) {
500 Ok(e) => {
501 let mut list = vec![];
502 for item in e.members() {
503 if let Some(value) = item["table_name"].as_str() {
504 list.push(JsonValue::from(value));
505 }
506 }
507 list.into()
508 }
509 Err(_) => {
510 array![]
511 }
512 }
513 }
514
515 fn database_create(&mut self, name: &str) -> bool {
516 let check_sql = format!("SELECT 1 FROM pg_database WHERE datname = '{}'", name);
518 let (exists_state, exists_data) = self.query(check_sql.as_str());
519
520 if exists_state && !exists_data.is_empty() && exists_data.members().count() > 0 {
521 return true;
523 }
524
525 let db_name = format!("\"{}\"", name);
527
528 let mut sql = format!("CREATE DATABASE {}", db_name);
530
531 if !self.connection.charset.str().is_empty() {
533 let charset_str = self.connection.charset.str();
534 let encoding = match charset_str.as_str() {
535 "utf8" | "utf8mb4" => "UTF8",
536 "latin1" => "LATIN1",
537 _ => "UTF8",
538 };
539 sql = format!("{} ENCODING '{}'", sql, encoding);
540 }
541
542 let (state, data) = self.execute(sql.as_str());
543 match state {
544 true => true, false => {
546 error!("创建数据库失败: {data:?}");
547 false
548 }
549 }
550 }
551
552 fn database_update(&mut self, name: &str, options: JsonValue) -> bool {
553 let db_name = format!("\"{}\"", name);
554 let mut sql_parts = vec![];
555
556 if options.has_key("encoding") {
558 let encoding = options["encoding"].as_str().unwrap_or("UTF8");
559 sql_parts.push(format!("ENCODING '{}'", encoding));
560 }
561
562 if options.has_key("owner") {
564 let owner = options["owner"].as_str().unwrap_or("");
565 sql_parts.push(format!("OWNER = \"{}\"", owner));
566 }
567
568 if sql_parts.is_empty() {
569 return true; }
571
572 let sql = format!("ALTER DATABASE {} {}", db_name, sql_parts.join(" "));
573 let (state, data) = self.execute(sql.as_str());
574
575 match state {
576 true => true,
577 false => {
578 error!("更新数据库失败: {data:?}");
579 false
580 }
581 }
582 }
583}
584
585impl Mode for Pgsql {
586 fn transaction(&mut self) -> bool {
587 let thread_id = format!("{:?}", thread::current().id());
588
589 let is_nested = {
591 let mut trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
592 if let Some(count) = trans_map.get_mut(&*thread_id) {
593 *count += 1;
595 true
596 } else {
597 trans_map.insert(thread_id.clone(), 1);
599 false
600 }
601 };
602
603 if is_nested {
604 return true;
605 }
606
607 let key = format!("{}{}", self.default, thread_id);
608
609 let mut guard = match self.client.get_guard() {
611 Ok(g) => g,
612 Err(e) => {
613 error!("获取事务连接失败: {e}");
614 TRANS
616 .lock()
617 .unwrap_or_else(|e| e.into_inner())
618 .remove(&*thread_id);
619 return false;
620 }
621 };
622
623 let conn = guard.conn().clone();
624 drop(guard);
625
626 TR.lock()
627 .unwrap_or_else(|e| e.into_inner())
628 .insert(key.clone(), Arc::new(Mutex::new(conn)));
629
630 let sql = "START TRANSACTION;".to_string();
631 let (state, _) = self.execute(sql.as_str());
632 match state {
633 true => {
634 let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
635 let (state, _) = self.execute(sql.as_str());
636 match state {
637 true => state,
638 false => {
639 TRANS
641 .lock()
642 .unwrap_or_else(|e| e.into_inner())
643 .remove(&*thread_id);
644 TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
645 state
646 }
647 }
648 }
649 false => {
650 TRANS
652 .lock()
653 .unwrap_or_else(|e| e.into_inner())
654 .remove(&*thread_id);
655 TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
656 state
657 }
658 }
659 }
660
661 fn commit(&mut self) -> bool {
662 let thread_id = format!("{:?}", thread::current().id());
663 let key = format!("{}{}", self.default, thread_id);
664
665 let should_commit = {
667 let mut trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
668 if let Some(count) = trans_map.get_mut(&*thread_id) {
669 if *count > 1 {
670 *count -= 1;
672 false } else {
674 true
676 }
677 } else {
678 error!("提交事务失败: 线程ID: {thread_id} 事务不存在");
680 return false;
681 }
682 };
683
684 if !should_commit {
685 return true;
686 }
687
688 let sql = "COMMIT".to_string();
690 let (state, data) = self.execute(sql.as_str());
691
692 if state {
694 TRANS
696 .lock()
697 .unwrap_or_else(|e| e.into_inner())
698 .remove(&thread_id);
699 TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
700 } else {
701 let error_msg = data
702 .as_str()
703 .map(|s| s.to_string())
704 .unwrap_or_else(|| data.to_string());
705 error!(
706 "提交事务失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL: {} | 错误详情: {}",
707 thread_id,
708 self.default,
709 self.connection.database,
710 sql,
711 error_msg
712 );
713 }
715 state
716 }
717
718 fn rollback(&mut self) -> bool {
719 let thread_id = format!("{:?}", thread::current().id());
720 let key = format!("{}{}", self.default, thread_id);
721
722 let should_rollback = {
724 let mut trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
725 if let Some(count) = trans_map.get_mut(&*thread_id) {
726 if *count > 1 {
727 *count -= 1;
729 false } else {
731 true
733 }
734 } else {
735 error!("回滚事务失败: 线程ID: {thread_id} 事务不存在");
737 return false;
738 }
739 };
740
741 if !should_rollback {
742 return true;
743 }
744
745 let sql = "ROLLBACK".to_string();
747 let (state, data) = self.execute(sql.as_str());
748
749 TRANS
751 .lock()
752 .unwrap_or_else(|e| e.into_inner())
753 .remove(&thread_id);
754 TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
755
756 if !state {
757 let error_msg = data
758 .as_str()
759 .map(|s| s.to_string())
760 .unwrap_or_else(|| data.to_string());
761 error!(
762 "回滚事务失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL: {} | 错误详情: {}",
763 thread_id,
764 self.default,
765 self.connection.database,
766 sql,
767 error_msg
768 );
769 }
770 state
771 }
772
773 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
774 let (state, data) = self.query(sql);
775 match state {
776 true => Ok(data),
777 false => Err(data.to_string()),
778 }
779 }
780
781 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
782 let (state, data) = self.execute(sql);
783 match state {
784 true => Ok(data),
785 false => Err(data.to_string()),
786 }
787 }
788
789 fn table_create(&mut self, options: TableOptions) -> JsonValue {
790 println!("!!! TABLE_CREATE FUNCTION CALLED with table: {}", options.table_name);
791
792 let cache_key = format!("{}{}", self.default, options.table_name);
794 if TABLE_FIELDS.lock().unwrap().get(&cache_key).is_some() {
795 TABLE_FIELDS.lock().unwrap().remove(&cache_key);
796 }
797
798 let mut sql = String::new();
799 let mut unique = String::new();
800 let mut index = String::new();
801
802 let mut unique_fields = String::new();
804 for item in options.table_unique.iter() {
805 if unique_fields.is_empty() {
806 unique_fields = format!("\"{}\"", item);
807 } else {
808 unique_fields = format!("{}, \"{}\"", unique_fields, item);
809 }
810 }
811 if !unique_fields.is_empty() {
812 let mut unique_name = String::new();
813 for item in options.table_unique.iter() {
814 if unique_name.is_empty() {
815 unique_name = format!("{}_unique_{}", options.table_name, item);
816 } else {
817 unique_name = format!("{}_{}", unique_name, item);
818 }
819 }
820 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
821 unique_name = generate_index_name(&format!("unique_{}", md5));
822 unique = format!("CONSTRAINT {} UNIQUE ({})", unique_name, unique_fields);
823 }
824
825 for row in options.table_index.iter() {
827 let mut index_fields = String::new();
828 let mut index_name = String::new();
829 for item in row {
830 if index_fields.is_empty() {
831 index_fields = format!("\"{}\"", item);
832 index_name = format!("{}_index_{}", options.table_name, item);
833 } else {
834 index_fields = format!("{}, \"{}\"", index_fields, item);
835 index_name = format!("{}_{}", index_name, item);
836 }
837 }
838 let md5 = br_crypto::md5::encrypt_hex(index_name.as_bytes());
839 index_name = generate_index_name(&format!("index_{}", md5));
840 if index.is_empty() {
841 index = format!(
842 "CREATE INDEX {} ON {} ({})",
843 index_name, options.table_name, index_fields
844 );
845 } else {
846 index = format!(
847 "{}\r\nCREATE INDEX {} ON {} ({})",
848 index, index_name, options.table_name, index_fields
849 );
850 }
851 }
852
853 for (name, field) in options.table_fields.entries() {
855 let row = br_fields::field("pgsql", name, field.clone());
856 sql = format!("{sql} {row},\r\n");
857 }
858
859 if !unique.is_empty() {
861 sql = sql.trim_end_matches(",\r\n").to_string();
862 sql = format!("{sql},\r\n{unique}");
863 }
864 sql = if sql.trim_end().ends_with(",") {
865 format!("{}\r\nPRIMARY KEY(\"{}\")", sql, options.table_key)
866 } else {
867 format!("{},\r\nPRIMARY KEY(\"{}\")", sql, options.table_key)
868 };
869
870 let create_sql = format!(
872 "CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n)",
873 options.table_name, sql
874 );
875
876 if self.params.sql {
877 let mut statements = vec![create_sql];
879 if !index.is_empty() {
880 let index_statements: Vec<&str> = index.split("\r\n").filter(|s| !s.trim().is_empty()).collect();
881 for index_sql in index_statements {
882 let final_sql = if index_sql.trim().ends_with(';') {
883 index_sql.trim().to_string()
884 } else {
885 format!("{};", index_sql.trim())
886 };
887 statements.push(final_sql);
888 }
889 }
890 let full_sql = statements.join("\r\n");
891 println!("DEBUG: Returning SQL in fetch_sql mode: {}", full_sql);
892 return JsonValue::from(full_sql);
893 }
894
895 println!("=== TABLE CREATE DEBUG START ===");
897 println!("DEBUG: params.sql = {}, table_name = {}", self.params.sql, options.table_name);
898 println!("CREATE TABLE SQL: {}", create_sql);
899 let (state, data) = self.execute(&create_sql);
900 println!("CREATE TABLE result: state={}, data={}", state, data);
901 if !state {
902 error!("创建表失败: {} - {}", options.table_name, data);
903 return JsonValue::from(false);
904 }
905
906 if !index.is_empty() {
908 println!("Index statements: {}", index);
909 let index_statements: Vec<&str> = index.split("\r\n").filter(|s| !s.trim().is_empty()).collect();
910 for (i, index_sql) in index_statements.iter().enumerate() {
911 let final_sql = if index_sql.trim().ends_with(';') {
912 index_sql.trim().to_string()
913 } else {
914 format!("{};", index_sql.trim())
915 };
916 println!("Index SQL {}: {}", i+1, final_sql);
917 let (idx_state, idx_data) = self.execute(&final_sql);
918 println!("Index result {}: state={}, data={}", i+1, idx_state, idx_data);
919 if !idx_state {
920 error!("创建索引失败: {} - {}", options.table_name, idx_data);
921 return JsonValue::from(false);
922 }
923 }
924 }
925
926 JsonValue::from(true)
928 }
929
930 fn table_update(&mut self, options: TableOptions) -> JsonValue {
931 if !self.table_is_exist(&options.table_name) {
933 warn!("表 {} 不存在,尝试创建表", options.table_name);
935 return self.table_create(options);
936 }
937
938 let cache_key = format!("{}{}", self.default, options.table_name);
940 if TABLE_FIELDS.lock().unwrap().get(&cache_key).is_some() {
941 TABLE_FIELDS.lock().unwrap().remove(&cache_key);
942 }
943
944 let mut sql = vec![];
945 let fields_list = self.table_info(&options.table_name);
946 let mut put = vec![];
947 let mut add = vec![];
948 let mut del = vec![];
949
950 for (key, _) in fields_list.entries() {
952 if options.table_fields[key].is_empty() {
953 del.push(key);
954 }
955 }
956
957 for (name, field) in options.table_fields.entries() {
959 println!("DEBUG: Processing field '{}', exists in DB: {}", name, !fields_list[name].is_empty());
960 let keys: Vec<_> = fields_list.entries().map(|(k, _)| k).collect();
961 println!("DEBUG: fields_list keys: {:?}", keys);
962 println!("DEBUG: Looking for key '{}' in fields_list: {}", name, keys.contains(&name));
963 if !fields_list[name].is_empty() {
964 let old_field = &fields_list[name];
966 println!("DEBUG: Field '{}' exists, old_field: {:?}", name, old_field);
967
968 let new_title = field["title"].as_str().unwrap_or("");
970
971 let old_comment = old_field["comment"].as_str().unwrap_or("");
974 let old_title = if !old_comment.is_empty() {
975 old_comment.split('|').next().unwrap_or("")
976 } else {
977 ""
978 };
979
980 let new_field_title = format!("{}|{}", name, new_title);
982 let old_field_title = format!("{}|{}", name, old_title);
983
984 if new_field_title != old_field_title {
986 put.push(name);
987 continue;
988 }
989
990 continue;
992 } else {
993 add.push(name);
995 }
996 }
997
998 if add.is_empty() && del.is_empty() && put.is_empty() {
1000 info!(
1001 "数据库更新情况: {} 成功 更新前检查字段是否有变化,没有变化的不需要更新",
1002 options.table_name
1003 );
1004 return JsonValue::from(-1);
1005 }
1006
1007 for name in add.iter() {
1009 let name_str = name.to_string();
1010
1011 let column_check_sql = format!(
1013 "SELECT COUNT(*) as count FROM information_schema.columns WHERE table_schema = 'public' AND table_name = '{}' AND column_name = '{}'",
1014 options.table_name, name_str
1015 );
1016 println!("DEBUG: Checking column existence with SQL: {}", column_check_sql);
1017 let (check_state, check_result) = self.query(&column_check_sql);
1018
1019 if check_state {
1020 let count = check_result[0]["count"].as_i64().unwrap_or(0);
1021 if count > 0 {
1022 println!("DEBUG: Column '{}' already exists in table '{}', skipping ADD COLUMN", name_str, options.table_name);
1023 continue;
1024 }
1025 } else {
1026 println!("DEBUG: Column check failed for '{}', assuming it doesn't exist", name_str);
1027 }
1028
1029 let row = br_fields::field("pgsql", &name_str, options.table_fields[name_str.as_str()].clone());
1030 let alter_sql = format!(
1031 "ALTER TABLE {} ADD COLUMN {}",
1032 options.table_name, row
1033 );
1034 println!("DEBUG: Adding column with SQL: {}", alter_sql);
1035 sql.push(alter_sql);
1036 }
1037
1038 for name in del.iter() {
1039 sql.push(format!(
1040 "ALTER TABLE {} DROP COLUMN \"{}\"",
1041 options.table_name, name
1042 ));
1043 }
1044
1045 for name in put.iter() {
1046 let name_str = name.to_string();
1047 let row = br_fields::field("pgsql", &name_str, options.table_fields[name_str.as_str()].clone());
1048 println!("DEBUG: br_fields::field returned: {}", row);
1049
1050 let comment_parts: Vec<&str> = row.split(" comment ").collect();
1052 let field_type_part = if comment_parts.len() > 1 {
1053 comment_parts[0].trim()
1054 } else {
1055 row.trim()
1056 };
1057
1058 let type_def = if let Some(space_pos) = field_type_part.find(' ') {
1061 &field_type_part[space_pos + 1..]
1062 } else {
1063 field_type_part
1064 };
1065
1066 sql.push(format!(
1068 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {}",
1069 options.table_name, name_str, type_def
1070 ));
1071
1072 if comment_parts.len() > 1 {
1074 let comment = comment_parts[1]
1075 .trim_start_matches("'")
1076 .trim_end_matches("'");
1077 sql.push(format!(
1078 "COMMENT ON COLUMN {}.{} IS '{}'",
1079 options.table_name, name_str, comment
1080 ));
1081 }
1082
1083 let old_field = self.table_info(&options.table_name);
1085 if old_field[name_str.as_str()].is_object() {
1086 let field_info = &old_field[name_str.as_str()];
1087 let old_default = field_info["column_default"].as_str().unwrap_or("");
1089
1090 if row.contains("not null") && !old_default.contains("not null") {
1092 sql.push(format!(
1093 "ALTER TABLE {} ALTER COLUMN \"{}\" SET NOT NULL",
1094 options.table_name, name_str
1095 ));
1096 }
1097 if let Some(default_start) = row.find("default '") {
1099 if let Some(default_end) = row[default_start + 9..].find('\'') {
1100 let new_default = &row[default_start + 9..default_start + 9 + default_end];
1101 sql.push(format!(
1102 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {}",
1103 options.table_name, name_str, new_default
1104 ));
1105 }
1106 }
1107 }
1108 }
1109
1110 let mut unique_fields = String::new();
1112 let mut unique_name = String::new();
1113 for item in options.table_unique.iter() {
1114 if unique_fields.is_empty() {
1115 unique_fields = format!("\"{}\"", item);
1116 unique_name = format!("{}_unique_{}", options.table_name, item);
1117 } else {
1118 unique_fields = format!("{}, \"{}\"", unique_fields, item);
1119 unique_name = format!("{}_{}", unique_name, item);
1120 }
1121 }
1122
1123 if !unique_name.is_empty() {
1124 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
1125 unique_name = generate_index_name(&format!("unique_{}", md5));
1126
1127 let (_, index_list) = self.query(format!(
1129 "SELECT indexname FROM pg_indexes WHERE tablename = '{}' AND indexdef LIKE '%UNIQUE%'",
1130 options.table_name
1131 ).as_str());
1132
1133 let mut unique_new = vec![];
1134 for item in index_list.members() {
1135 if let Some(index_name) = item["indexname"].as_str() {
1136 unique_new.push(index_name.to_string());
1137 }
1138 }
1139
1140 for item in &unique_new {
1142 if unique_name != *item {
1143 sql.push(format!("DROP INDEX IF EXISTS {}", item));
1144 }
1145 }
1146
1147 if !unique_new.contains(&unique_name) {
1149 sql.push(format!(
1150 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({})",
1151 unique_name, options.table_name, unique_fields
1152 ));
1153 }
1154 }
1155
1156 let mut index_list = vec![];
1158 for row in options.table_index.iter() {
1159 let mut index_fields = String::new();
1160 let mut index_name = String::new();
1161 for item in row {
1162 if index_fields.is_empty() {
1163 index_fields = format!("\"{}\"", item);
1164 index_name = format!("{}_index_{}", options.table_name, item);
1165 } else {
1166 index_fields = format!("{}, \"{}\"", index_fields, item);
1167 index_name = format!("{}_{}", index_name, item);
1168 }
1169 }
1170 let md5 = br_crypto::md5::encrypt_hex(index_name.as_bytes());
1171 index_name = generate_index_name(&format!("index_{}", md5));
1172 index_list.push(index_name.clone());
1173
1174 let (_, existing_indexes) = self.query(
1176 format!(
1177 "SELECT indexname FROM pg_indexes WHERE tablename = '{}'",
1178 options.table_name
1179 )
1180 .as_str(),
1181 );
1182
1183 let mut existing_index_names = vec![];
1184 for item in existing_indexes.members() {
1185 if let Some(index_name) = item["indexname"].as_str() {
1186 existing_index_names.push(index_name.to_string());
1187 }
1188 }
1189
1190 if !existing_index_names.contains(&index_name) {
1191 sql.push(format!(
1192 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
1193 index_name, options.table_name, index_fields
1194 ));
1195 }
1196 }
1197
1198 let (_, all_indexes) = self.query(
1200 format!(
1201 "SELECT indexname FROM pg_indexes WHERE tablename = '{}'",
1202 options.table_name
1203 )
1204 .as_str(),
1205 );
1206
1207 for item in all_indexes.members() {
1208 if let Some(index_name) = item["indexname"].as_str() {
1209 if !index_name.contains("_pkey") && !index_list.contains(&index_name.to_string()) {
1211 if index_name.starts_with(&format!("{}_", options.table_name)) {
1213 sql.push(format!("DROP INDEX IF EXISTS {}", index_name));
1214 }
1215 }
1216 }
1217 }
1218
1219 if self.params.sql {
1220 return JsonValue::from(sql.join("\r\n"));
1221 }
1222
1223 if sql.is_empty() {
1225 info!(
1226 "数据库更新情况: {} 成功 更新前检查字段是否有变化,没有变化的不需要更新",
1227 options.table_name
1228 );
1229 return JsonValue::from(-1);
1230 }
1231
1232 for item in sql.iter() {
1234 let (state, res) = self.execute(item.as_str());
1235 match state {
1236 true => {}
1237 false => {
1238 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
1239 return JsonValue::from(0);
1240 }
1241 }
1242 }
1243
1244 JsonValue::from(1)
1245 }
1246
1247 fn table_info(&mut self, table: &str) -> JsonValue {
1248 let cache_key = format!("{}{}", self.default, table);
1249 {
1250 let fields = TABLE_FIELDS.lock().unwrap();
1251 if let Some(cached) = fields.get(&cache_key) {
1252 return cached.clone();
1253 }
1254 }
1255
1256 let sql = format!(
1258 "SELECT
1259 COL.COLUMN_NAME,
1260 COL.DATA_TYPE,
1261 COL.UDT_NAME,
1262 COL.CHARACTER_MAXIMUM_LENGTH,
1263 COL.NUMERIC_PRECISION,
1264 COL.NUMERIC_SCALE,
1265 COL.COLUMN_DEFAULT,
1266 COL.IS_NULLABLE,
1267 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT
1268 FROM INFORMATION_SCHEMA.COLUMNS COL
1269 LEFT JOIN pg_catalog.pg_description DESCRIPTION
1270 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
1271 AND DESCRIPTION.objoid = (
1272 SELECT oid FROM pg_catalog.pg_class
1273 WHERE relname = COL.TABLE_NAME
1274 AND relnamespace = (SELECT oid FROM pg_catalog.pg_namespace WHERE nspname = 'public')
1275 LIMIT 1
1276 )
1277 WHERE COL.TABLE_SCHEMA = 'public'
1278 AND COL.TABLE_NAME = '{}'",
1279 table
1280 );
1281
1282 println!("DEBUG: Table info SQL: {}", sql);
1283 let (state, data) = self.query(sql.as_str());
1284 println!("DEBUG: Table info query result: state={}, data={}", state, data);
1285 let mut list = object! {};
1286
1287 if state {
1288 println!("DEBUG: Processing {} data items", data.members().len());
1289 for (i, item) in data.members().enumerate() {
1290 println!("DEBUG: Item {}: {}", i, item);
1291 if let Some(field_name) = item["column_name"].as_str() {
1292 println!("DEBUG: Extracted field_name: '{}'", field_name);
1293 let mut row = object! {};
1294 row["field"] = JsonValue::from(field_name);
1295 row["type"] = item["data_type"].clone();
1296 row["udt_name"] = item["udt_name"].clone();
1297 row["comment"] = item["comment"].clone();
1298 row["is_nullable"] = item["is_nullable"].clone();
1299 row["column_default"] = item["column_default"].clone();
1300 row["character_maximum_length"] = item["character_maximum_length"].clone();
1301 row["numeric_precision"] = item["numeric_precision"].clone();
1302 row["numeric_scale"] = item["numeric_scale"].clone();
1303 list[field_name] = row;
1304 println!("DEBUG: Stored field '{}' in list", field_name);
1305 } else {
1306 println!("DEBUG: Could not extract COLUMN_NAME from item: {}", item);
1307 }
1308 }
1309 let list_clone = list.clone();
1310 println!("DEBUG: Storing {} fields in cache for key: {}", list_clone.len(), cache_key);
1311 TABLE_FIELDS.lock().unwrap().insert(cache_key, list_clone);
1312 }
1313 list
1314 }
1315
1316 fn table_is_exist(&mut self, name: &str) -> bool {
1317 let sql = format!(
1318 "SELECT COUNT(*) as count FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{}'",
1319 name
1320 );
1321 let (state, data) = self.query(sql.as_str());
1322 match state {
1323 true => {
1324 if !data.is_empty() {
1325 if let Some(count) = data[0]["count"].as_i64() {
1326 return count > 0;
1327 }
1328 }
1329 false
1330 }
1331 false => false,
1332 }
1333 }
1334
1335 fn table(&mut self, name: &str) -> &mut Pgsql {
1336 self.params.table = name.to_string();
1337 self
1338 }
1339
1340 fn change_table(&mut self, name: &str) -> &mut Self {
1341 self.params.table = name.to_string();
1342 self
1343 }
1344
1345 fn autoinc(&mut self) -> &mut Self {
1346 self
1347 }
1348
1349 fn fetch_sql(&mut self) -> &mut Self {
1350 self.params.sql = true;
1351 self
1352 }
1353
1354 fn order(&mut self, _field: &str, _by: bool) -> &mut Self {
1355 self
1356 }
1357
1358 fn group(&mut self, _field: &str) -> &mut Self {
1359 self
1360 }
1361
1362 fn distinct(&mut self) -> &mut Self {
1363 self
1364 }
1365
1366 fn json(&mut self, _field: &str) -> &mut Self {
1367 self
1368 }
1369
1370 fn location(&mut self, _field: &str) -> &mut Self {
1371 self
1372 }
1373
1374 fn field(&mut self, _field: &str) -> &mut Self {
1375 self
1376 }
1377
1378 fn hidden(&mut self, _name: &str) -> &mut Self {
1379 self
1380 }
1381
1382 fn where_and(&mut self, _field: &str, _compare: &str, _value: JsonValue) -> &mut Self {
1383 self
1384 }
1385
1386 fn where_or(&mut self, _field: &str, _compare: &str, _value: JsonValue) -> &mut Self {
1387 self
1388 }
1389
1390 fn where_column(&mut self, _field_a: &str, _compare: &str, _field_b: &str) -> &mut Self {
1391 self
1392 }
1393
1394 fn update_column(&mut self, _field_a: &str, _compare: &str) -> &mut Self {
1395 self
1396 }
1397
1398 fn page(&mut self, _page: i32, _limit: i32) -> &mut Self {
1399 self
1400 }
1401
1402 fn column(&mut self, _field: &str) -> JsonValue {
1403 JsonValue::Null
1404 }
1405
1406 fn count(&mut self) -> JsonValue {
1407 JsonValue::from(0)
1408 }
1409
1410 fn max(&mut self, _field: &str) -> JsonValue {
1411 JsonValue::from(0)
1412 }
1413
1414 fn min(&mut self, _field: &str) -> JsonValue {
1415 JsonValue::from(0)
1416 }
1417
1418 fn sum(&mut self, _field: &str) -> JsonValue {
1419 JsonValue::from(0)
1420 }
1421
1422 fn avg(&mut self, _field: &str) -> JsonValue {
1423 JsonValue::from(0)
1424 }
1425
1426 fn select(&mut self) -> JsonValue {
1427 array![]
1428 }
1429
1430 fn find(&mut self) -> JsonValue {
1431 object! {}
1432 }
1433
1434 fn value(&mut self, _field: &str) -> JsonValue {
1435 JsonValue::Null
1436 }
1437
1438 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1439 let thread_id = format!("{:?}", thread::current().id());
1440
1441 let fields_list = self.table_info(&self.params.table.clone());
1442
1443 let mut fields = vec![];
1444 let mut values = vec![];
1445
1446 if !self.params.autoinc && data["id"].is_empty() {
1448 data["id"] =
1449 format!("{:X}", chrono::Local::now().timestamp_nanos_opt().unwrap()).into();
1450 }
1451
1452 for (field, value) in data.entries() {
1453 fields.push(format!("\"{}\"", field));
1454
1455 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1456 if value.is_empty() {
1457 values.push("NULL".to_string());
1458 continue;
1459 }
1460 let comment = fields_list[field]["comment"].to_string();
1461 let srid = comment
1462 .split("|")
1463 .collect::<Vec<&str>>()
1464 .last()
1465 .unwrap_or(&"4326")
1466 .to_string();
1467 let location = value.to_string().replace(",", " ");
1468 values.push(format!("ST_GeomFromText('POINT({})',{})", location, srid));
1469 continue;
1470 }
1471
1472 if value.is_string() || value.is_array() || value.is_object() {
1473 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1474 continue;
1475 } else if value.is_number() || value.is_boolean() || value.is_null() {
1476 values.push(format!("{}", value));
1477 continue;
1478 } else {
1479 values.push(format!("'{}'", value));
1480 continue;
1481 }
1482 }
1483
1484 let fields = fields.join(",");
1485 let values = values.join(",");
1486
1487 let sql = format!(
1488 "INSERT INTO {} ({}) VALUES ({});",
1489 self.params.table, fields, values
1490 );
1491
1492 if self.params.sql {
1493 return JsonValue::from(sql);
1494 }
1495
1496 let (state, ids) = self.execute(sql.as_str());
1497 match state {
1498 true => match self.params.autoinc {
1499 true => ids.clone(),
1500 false => data["id"].clone(),
1501 },
1502 false => {
1503 let error_msg = ids
1504 .as_str()
1505 .map(|s| s.to_string())
1506 .unwrap_or_else(|| ids.to_string());
1507 error!(
1508 "插入记录失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | 表名: {} | SQL语句: [{}] | 错误详情: {}",
1509 thread_id,
1510 self.default,
1511 self.connection.database,
1512 self.params.table,
1513 sql,
1514 error_msg
1515 );
1516 JsonValue::from("")
1517 }
1518 }
1519 }
1520
1521 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1522 let thread_id = format!("{:?}", thread::current().id());
1523 let list = array![];
1524
1525 if !data.is_array() {
1526 return list;
1527 }
1528
1529 let fields_list = self.table_info(&self.params.table.clone());
1530 let mut fields = vec![];
1531
1532 use std::collections::HashSet;
1534 let mut field_set = HashSet::new();
1535 for record in data.members() {
1536 for (field_name, _) in record.entries() {
1537 field_set.insert(field_name.to_string());
1538 }
1539 }
1540
1541 let mut field_vec: Vec<String> = field_set.into_iter().collect();
1543 field_vec.sort();
1544 for field_name in &field_vec {
1545 fields.push(format!("\"{}\"", field_name));
1546 }
1547
1548 if !self.params.autoinc {
1550 for i in 0..data.len() {
1551 if data[i]["id"].is_empty() {
1552 data[i]["id"] = format!(
1553 "{:X}",
1554 chrono::Local::now().timestamp_nanos_opt().unwrap() + i as i64
1555 )
1556 .into();
1557 }
1558 }
1559 }
1560
1561 let mut values_list = vec![];
1562 for record in data.members() {
1563 let mut values = vec![];
1564
1565 for field_name in &field_vec {
1567 let value = record[field_name].clone();
1568
1569 if self.params.location.has_key(field_name)
1570 && !self.params.location[field_name].is_empty()
1571 {
1572 if value.is_empty() {
1573 values.push("NULL".to_string());
1574 continue;
1575 }
1576 let comment = fields_list[field_name]["comment"].to_string();
1577 let srid = comment
1578 .split("|")
1579 .collect::<Vec<&str>>()
1580 .last()
1581 .unwrap_or(&"4326")
1582 .to_string();
1583 let location = value.to_string().replace(",", " ");
1584 values.push(format!("ST_GeomFromText('POINT({})',{})", location, srid));
1585 continue;
1586 }
1587
1588 if value.is_string() || value.is_array() || value.is_object() {
1589 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1590 } else if value.is_number() || value.is_boolean() || value.is_null() {
1591 values.push(format!("{}", value));
1592 } else {
1593 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1594 }
1595 }
1596
1597 values_list.push(format!("({})", values.join(",")));
1598 }
1599
1600 let fields = fields.join(",");
1601 let values = values_list.join(",");
1602
1603 let sql = format!(
1604 "INSERT INTO {} ({}) VALUES {};",
1605 self.params.table, fields, values
1606 );
1607
1608 if self.params.sql {
1609 return JsonValue::from(sql);
1610 }
1611
1612 if self.params.table.is_empty() {
1614 error!("批量插入失败:表名为空 - 线程ID: {}", thread_id);
1615 return JsonValue::from("");
1616 }
1617
1618 let (state, result) = self.execute(sql.as_str());
1619 match state {
1620 true => {
1621 if self.params.autoinc {
1623 result.clone()
1624 } else {
1625 JsonValue::from(data.len() as i64)
1627 }
1628 }
1629 false => {
1630 let error_msg = result
1631 .as_str()
1632 .map(|s| s.to_string())
1633 .unwrap_or_else(|| result.to_string());
1634 error!(
1635 "批量插入失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | 表名: {} | SQL语句: [{}] | 错误详情: {}",
1636 thread_id,
1637 self.default,
1638 self.connection.database,
1639 self.params.table,
1640 sql,
1641 error_msg
1642 );
1643 JsonValue::from("")
1644 }
1645 }
1646 }
1647
1648 fn update(&mut self, _data: JsonValue) -> JsonValue {
1649 JsonValue::from(0)
1650 }
1651
1652 fn update_all(&mut self, _data: JsonValue) -> JsonValue {
1653 JsonValue::from(0)
1654 }
1655
1656 fn delete(&mut self) -> JsonValue {
1657 JsonValue::from(0)
1658 }
1659
1660 fn inc(&mut self, _field: &str, _num: f64) -> &mut Self {
1661 self
1662 }
1663
1664 fn dec(&mut self, _field: &str, _num: f64) -> &mut Self {
1665 self
1666 }
1667
1668 fn buildsql(&mut self) -> String {
1669 String::new()
1670 }
1671
1672 fn join_fields(&mut self, _fields: Vec<&str>) -> &mut Self {
1673 self
1674 }
1675
1676 fn join(
1677 &mut self,
1678 _main_table: &str,
1679 _main_fields: &str,
1680 _right_table: &str,
1681 _right_fields: &str,
1682 ) -> &mut Self {
1683 self
1684 }
1685
1686 fn join_inner(&mut self, _table: &str, _main_fields: &str, _second_fields: &str) -> &mut Self {
1687 self
1688 }
1689}