1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use json::{array, object, JsonValue};
5use lazy_static::lazy_static;
6use log::{error, info, warn};
7use std::sync::Mutex;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use br_pgsql::connect::Connect;
12use br_pgsql::pools::Pools;
13use br_pgsql::{PoolConstraints, PoolOpts};
14
15const PG_MAX_IDENTIFIER_LENGTH: usize = 63;
17
18fn generate_index_name(base_name: &str) -> String {
21 if base_name.len() <= PG_MAX_IDENTIFIER_LENGTH {
22 base_name.to_string()
23 } else {
24 let md5 = br_crypto::md5::encrypt_hex(base_name.as_bytes());
26 if let Some(underscore_pos) = base_name.find('_') {
29 let max_prefix_len = PG_MAX_IDENTIFIER_LENGTH - 6 - 16;
31 let prefix = &base_name[..underscore_pos.min(max_prefix_len)];
32 format!("{}_hash_{}", prefix, &md5[..16])
33 } else {
34 format!("idx_{}", &md5[..(PG_MAX_IDENTIFIER_LENGTH - 4)])
36 }
37 }
38}
39
40
41lazy_static! {
42 static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
43 static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
44 static ref TABLE_FIELDS: Arc<Mutex<HashMap<String, JsonValue>>> = Arc::new(Mutex::new(HashMap::new()));
45}
46#[derive(Clone)]
47pub struct Pgsql {
48 pub connection: Connection,
50 pub default: String,
52 pub params: Params,
53 pub client: Pools,
54}
55
56impl Pgsql {
57 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
60 let port = connection.hostport.parse::<i32>()
61 .map_err(|e| format!("端口号解析失败: {} ({})", connection.hostport, e))?;
62
63 let cp_connection = connection.clone();
64 let config = object! {
65 debug: cp_connection.debug,
66 username: cp_connection.username,
67 userpass: cp_connection.userpass,
68 database: cp_connection.database,
69 hostname: cp_connection.hostname,
70 hostport: port,
71 charset: cp_connection.charset.str(),
72 };
73 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
74
75 let max_pools = config["pool_max"].as_u32()
78 .filter(|&p| p > 0 && p <= 1000)
79 .unwrap_or(400) as usize;
80
81 let constraints = PoolConstraints::new(0, max_pools)
82 .map_err(|e| format!("连接池约束配置失败: {}", e))?;
83
84 let pool_opts = PoolOpts::default()
86 .with_constraints(constraints)
87 .with_reset_connection(true)
88 .with_connect_timeout(Duration::from_secs(5))
89 .with_read_timeout(Duration::from_secs(15))
90 .with_write_timeout(Duration::from_secs(20))
91 .with_tcp_keepalive(Duration::from_secs(5));
92
93 let pools = pgsql.pools_with_opts(pool_opts)?;
95
96 Ok(Self {
97 connection,
98 default: default.clone(),
99 params: Params::default("pgsql"),
100 client: pools,
101 })
102 }
103
104 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
105 let thread_id = format!("{:?}", thread::current().id());
106 let key = format!("{}{}", self.default, thread_id);
107
108 let db_opt = {
111 let trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
112 if trans_map.get(&*thread_id).is_some() {
113 let tr_map = TR.lock().unwrap_or_else(|e| e.into_inner());
115 tr_map.get(&key).cloned()
116 } else {
117 None
118 }
119 };
120
121 if let Some(db) = db_opt {
122 let mut t = db.lock().unwrap();
124 match t.query(sql) {
125 Ok(e) => {
126 (true, e.rows)
128 }
129 Err(e) => {
130 error!(
131 "事务查询失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
132 thread_id,
133 self.default,
134 self.connection.database,
135 sql,
136 e
137 );
138 (false, JsonValue::from(e.to_string()))
139 }
140 }
141 } else {
142 let mut guard = match self.client.get_guard() {
144 Ok(g) => g,
145 Err(e) => {
146 error!(
147 "获取数据库连接失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
148 thread_id,
149 self.default,
150 self.connection.database,
151 sql,
152 e
153 );
154 return (false, JsonValue::from(e.to_string()));
155 }
156 };
157
158 let res = guard.conn().query(sql);
159 match res {
160 Ok(e) => {
161 (true, e.rows)
163 }
164 Err(e) => {
165 error!(
166 "非事务查询失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
167 thread_id,
168 self.default,
169 self.connection.database,
170 sql,
171 e
172 );
173 (false, JsonValue::from(e.to_string()))
174 }
175 }
176 }
178 }
179 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
180 let thread_id = format!("{:?}", thread::current().id());
181 let key = format!("{}{}", self.default, thread_id);
182
183
184 let db_opt = {
187 let trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
188 if trans_map.get(&*thread_id).is_some() {
189 let tr_map = TR.lock().unwrap_or_else(|e| e.into_inner());
191 tr_map.get(&key).cloned()
192 } else {
193 None
194 }
195 };
196
197 if let Some(db) = db_opt {
198 let mut t = db.lock().unwrap();
200 match t.execute(sql) {
201 Ok(e) => {
202 if sql.contains("INSERT") {
204 (true, e.rows)
205 } else {
206 (true, e.affect_count.into())
207 }
208 }
209 Err(e) => {
210 let operation_type = if sql.trim_start().to_uppercase().starts_with("INSERT") {
211 "插入"
212 } else if sql.trim_start().to_uppercase().starts_with("UPDATE") {
213 "更新"
214 } else if sql.trim_start().to_uppercase().starts_with("DELETE") {
215 "删除"
216 } else {
217 "执行"
218 };
219 error!(
220 "事务{}失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | 表名: {} | SQL语句: [{}] | 错误详情: {}",
221 operation_type,
222 thread_id,
223 self.default,
224 self.connection.database,
225 self.params.table,
226 sql,
227 e
228 );
229 (false, JsonValue::from(e.to_string()))
230 }
231 }
232 } else {
233 let mut guard = match self.client.get_guard() {
235 Ok(g) => g,
236 Err(e) => {
237 error!(
238 "获取数据库连接失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL语句: [{}] | 错误详情: {}",
239 thread_id,
240 self.default,
241 self.connection.database,
242 sql,
243 e
244 );
245 return (false, JsonValue::from(e.to_string()));
246 }
247 };
248
249 let res = guard.conn().execute(sql);
250 match res {
251 Ok(e) => {
252 if sql.contains("INSERT") {
254 (true, e.rows)
255 } else {
256 (true, e.affect_count.into())
257 }
258 }
259 Err(e) => {
260 let operation_type = if sql.trim_start().to_uppercase().starts_with("INSERT") {
261 "插入"
262 } else if sql.trim_start().to_uppercase().starts_with("UPDATE") {
263 "更新"
264 } else if sql.trim_start().to_uppercase().starts_with("DELETE") {
265 "删除"
266 } else {
267 "执行"
268 };
269 error!(
270 "非事务{}失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | 表名: {} | SQL语句: [{}] | 错误详情: {}",
271 operation_type,
272 thread_id,
273 self.default,
274 self.connection.database,
275 self.params.table,
276 sql,
277 e
278 );
279 (false, JsonValue::from(e.to_string()))
280 }
281 }
282 }
284 }
285}
286
287impl DbMode for Pgsql {
288 fn database_tables(&mut self) -> JsonValue {
289 let sql = "SELECT table_name FROM information_schema.tables
290 WHERE table_schema = 'public' AND table_type = 'BASE TABLE'".to_string();
291 match self.sql(sql.as_str()) {
292 Ok(e) => {
293 let mut list = vec![];
294 for item in e.members() {
295 if let Some(value) = item["table_name"].as_str() {
296 list.push(JsonValue::from(value));
297 }
298 }
299 list.into()
300 }
301 Err(_) => {
302 array![]
303 }
304 }
305 }
306
307 fn database_create(&mut self, name: &str) -> bool {
308 let check_sql = format!(
310 "SELECT 1 FROM pg_database WHERE datname = '{}'",
311 name
312 );
313 let (exists_state, exists_data) = self.query(check_sql.as_str());
314
315 if exists_state && !exists_data.is_empty() && exists_data.members().count() > 0 {
316 return true;
318 }
319
320 let db_name = format!("\"{}\"", name);
322
323 let mut sql = format!("CREATE DATABASE {}", db_name);
325
326 if !self.connection.charset.str().is_empty() {
328 let charset_str = self.connection.charset.str();
329 let encoding = match charset_str.as_str() {
330 "utf8" | "utf8mb4" => "UTF8",
331 "latin1" => "LATIN1",
332 _ => "UTF8",
333 };
334 sql = format!("{} ENCODING '{}'", sql, encoding);
335 }
336
337 let (state, data) = self.execute(sql.as_str());
338 match state {
339 true => true, false => {
341 error!("创建数据库失败: {data:?}");
342 false
343 }
344 }
345 }
346
347 fn database_update(&mut self, name: &str, options: JsonValue) -> bool {
348 let db_name = format!("\"{}\"", name);
349 let mut sql_parts = vec![];
350
351 if options.has_key("encoding") {
353 let encoding = options["encoding"].as_str().unwrap_or("UTF8");
354 sql_parts.push(format!("ENCODING '{}'", encoding));
355 }
356
357 if options.has_key("owner") {
359 let owner = options["owner"].as_str().unwrap_or("");
360 sql_parts.push(format!("OWNER = \"{}\"", owner));
361 }
362
363 if sql_parts.is_empty() {
364 return true; }
366
367 let sql = format!("ALTER DATABASE {} {}", db_name, sql_parts.join(" "));
368 let (state, data) = self.execute(sql.as_str());
369
370 match state {
371 true => true,
372 false => {
373 error!("更新数据库失败: {data:?}");
374 false
375 }
376 }
377 }
378}
379
380impl Mode for Pgsql {
381 fn transaction(&mut self) -> bool {
382 let thread_id = format!("{:?}", thread::current().id());
383
384 let is_nested = {
386 let mut trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
387 if let Some(count) = trans_map.get_mut(&*thread_id) {
388 *count += 1;
390 true
391 } else {
392 trans_map.insert(thread_id.clone(), 1);
394 false
395 }
396 };
397
398 if is_nested {
399 return true;
400 }
401
402 let key = format!("{}{}", self.default, thread_id);
403
404 let mut guard = match self.client.get_guard() {
406 Ok(g) => g,
407 Err(e) => {
408 error!("获取事务连接失败: {e}");
409 TRANS.lock().unwrap_or_else(|e| e.into_inner()).remove(&*thread_id);
411 return false;
412 }
413 };
414
415 let conn = guard.conn().clone();
416 drop(guard);
417
418 TR.lock().unwrap_or_else(|e| e.into_inner()).insert(key.clone(), Arc::new(Mutex::new(conn)));
419
420 let sql = "START TRANSACTION;".to_string();
421 let (state, _) = self.execute(sql.as_str());
422 match state {
423 true => {
424 let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
425 let (state, _) = self.execute(sql.as_str());
426 match state {
427 true => state,
428 false => {
429 TRANS.lock().unwrap_or_else(|e| e.into_inner()).remove(&*thread_id);
431 TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
432 state
433 }
434 }
435 }
436 false => {
437 TRANS.lock().unwrap_or_else(|e| e.into_inner()).remove(&*thread_id);
439 TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
440 state
441 }
442 }
443 }
444
445 fn commit(&mut self) -> bool {
446 let thread_id = format!("{:?}", thread::current().id());
447 let key = format!("{}{}", self.default, thread_id);
448
449 let should_commit = {
451 let mut trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
452 if let Some(count) = trans_map.get_mut(&*thread_id) {
453 if *count > 1 {
454 *count -= 1;
456 false } else {
458 true
460 }
461 } else {
462 error!("提交事务失败: 线程ID: {thread_id} 事务不存在");
464 return false;
465 }
466 };
467
468 if !should_commit {
469 return true;
470 }
471
472 let sql = "COMMIT".to_string();
474 let (state, data) = self.execute(sql.as_str());
475
476 if state {
478 TRANS.lock().unwrap_or_else(|e| e.into_inner()).remove(&thread_id);
480 TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
481 } else {
482 let error_msg = data.as_str().map(|s| s.to_string()).unwrap_or_else(|| data.to_string());
483 error!(
484 "提交事务失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL: {} | 错误详情: {}",
485 thread_id,
486 self.default,
487 self.connection.database,
488 sql,
489 error_msg
490 );
491 }
493 state
494 }
495
496 fn rollback(&mut self) -> bool {
497 let thread_id = format!("{:?}", thread::current().id());
498 let key = format!("{}{}", self.default, thread_id);
499
500 let should_rollback = {
502 let mut trans_map = TRANS.lock().unwrap_or_else(|e| e.into_inner());
503 if let Some(count) = trans_map.get_mut(&*thread_id) {
504 if *count > 1 {
505 *count -= 1;
507 false } else {
509 true
511 }
512 } else {
513 error!("回滚事务失败: 线程ID: {thread_id} 事务不存在");
515 return false;
516 }
517 };
518
519 if !should_rollback {
520 return true;
521 }
522
523 let sql = "ROLLBACK".to_string();
525 let (state, data) = self.execute(sql.as_str());
526
527 TRANS.lock().unwrap_or_else(|e| e.into_inner()).remove(&thread_id);
529 TR.lock().unwrap_or_else(|e| e.into_inner()).remove(&key);
530
531 if !state {
532 let error_msg = data.as_str().map(|s| s.to_string()).unwrap_or_else(|| data.to_string());
533 error!(
534 "回滚事务失败 - 线程ID: {} | 数据库配置: {} | 数据库名: {} | SQL: {} | 错误详情: {}",
535 thread_id,
536 self.default,
537 self.connection.database,
538 sql,
539 error_msg
540 );
541 }
542 state
543 }
544
545 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
546 let (state, data) = self.query(sql);
547 match state {
548 true => Ok(data),
549 false => Err(data.to_string()),
550 }
551 }
552
553 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
554 let (state, data) = self.execute(sql);
555 match state {
556 true => Ok(data),
557 false => Err(data.to_string()),
558 }
559 }
560
561 fn table_create(&mut self, options: TableOptions) -> JsonValue {
562 let cache_key = format!("{}{}", self.default, options.table_name);
564 if TABLE_FIELDS.lock().unwrap().get(&cache_key).is_some() {
565 TABLE_FIELDS.lock().unwrap().remove(&cache_key);
566 }
567
568 let mut sql = String::new();
569 let mut unique = String::new();
570 let mut index = String::new();
571
572 let mut unique_fields = String::new();
574 for item in options.table_unique.iter() {
575 if unique_fields.is_empty() {
576 unique_fields = format!("\"{}\"", item);
577 } else {
578 unique_fields = format!("{}, \"{}\"", unique_fields, item);
579 }
580 }
581 if !unique_fields.is_empty() {
582 let mut unique_name = String::new();
583 for item in options.table_unique.iter() {
584 if unique_name.is_empty() {
585 unique_name = format!("{}_unique_{}", options.table_name, item);
586 } else {
587 unique_name = format!("{}_{}", unique_name, item);
588 }
589 }
590 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
591 unique_name = generate_index_name(&format!("unique_{}", md5));
592 unique = format!("CONSTRAINT {} UNIQUE ({})", unique_name, unique_fields);
593 }
594
595 for row in options.table_index.iter() {
597 let mut index_fields = String::new();
598 let mut index_name = String::new();
599 for item in row {
600 if index_fields.is_empty() {
601 index_fields = format!("\"{}\"", item);
602 index_name = format!("{}_index_{}", options.table_name, item);
603 } else {
604 index_fields = format!("{}, \"{}\"", index_fields, item);
605 index_name = format!("{}_{}", index_name, item);
606 }
607 }
608 let md5 = br_crypto::md5::encrypt_hex(index_name.as_bytes());
609 index_name = generate_index_name(&format!("index_{}", md5));
610 if index.is_empty() {
611 index = format!("CREATE INDEX {} ON {} ({});", index_name, options.table_name, index_fields);
612 } else {
613 index = format!("{};\r\nCREATE INDEX {} ON {} ({});", index, index_name, options.table_name, index_fields);
614 }
615 }
616
617 for (name, field) in options.table_fields.entries() {
619 let row = br_fields::field("pgsql", name, field.clone());
620 sql = format!("{sql} {row},\r\n");
621 }
622
623 if !unique.is_empty() {
625 sql = sql.trim_end_matches(",\r\n").to_string();
626 sql = format!("{sql},\r\n{unique}");
627 }
628 sql = if sql.trim_end().ends_with(",") {
629 format!("{}\r\nPRIMARY KEY(\"{}\")", sql, options.table_key)
630 } else {
631 format!("{},\r\nPRIMARY KEY(\"{}\")", sql, options.table_key)
632 };
633
634 let create_sql = format!(
636 "CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n);\r\n{}",
637 options.table_name, sql, index
638 );
639
640 if self.params.sql {
641 return JsonValue::from(create_sql);
642 }
643
644
645 let (state, data) = self.execute(create_sql.as_str());
647
648
649
650 match state {
651 true => JsonValue::from(true),
652 false => {
653 error!("创建表错误: {} - {}", options.table_name, data);
654 JsonValue::from(false)
655 }
656 }
657 }
658
659 fn table_update(&mut self, options: TableOptions) -> JsonValue {
660 if !self.table_is_exist(&options.table_name) {
662 warn!("表 {} 不存在,尝试创建表", options.table_name);
664 return self.table_create(options);
665 }
666
667 let cache_key = format!("{}{}", self.default, options.table_name);
669 if TABLE_FIELDS.lock().unwrap().get(&cache_key).is_some() {
670 TABLE_FIELDS.lock().unwrap().remove(&cache_key);
671 }
672
673 let mut sql = vec![];
674 let fields_list = self.table_info(&options.table_name);
675 let mut put = vec![];
676 let mut add = vec![];
677 let mut del = vec![];
678
679 for (key, _) in fields_list.entries() {
681 if options.table_fields[key].is_empty() {
682 del.push(key);
683 }
684 }
685
686 for (name, field) in options.table_fields.entries() {
688 if !fields_list[name].is_empty() {
689 let old_field = &fields_list[name];
691
692 let new_title = field["title"].as_str().unwrap_or("");
694
695 let old_comment = old_field["comment"].as_str().unwrap_or("");
698 let old_title = if !old_comment.is_empty() {
699 old_comment.split('|').next().unwrap_or("")
700 } else {
701 ""
702 };
703
704 let new_field_title = format!("{}|{}", name, new_title);
706 let old_field_title = format!("{}|{}", name, old_title);
707
708 if new_field_title != old_field_title {
710 put.push(name);
711 continue;
712 }
713
714 continue;
716 } else {
717 add.push(name);
719 }
720 }
721
722 if add.is_empty() && del.is_empty() && put.is_empty() {
724 info!("数据库更新情况: {} 成功 更新前检查字段是否有变化,没有变化的不需要更新", options.table_name);
725 return JsonValue::from(-1);
726 }
727
728 for name in add.iter() {
730 let name = name.to_string();
731 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
732 sql.push(format!("ALTER TABLE {} ADD COLUMN {};\r\n", options.table_name, row));
733 }
734
735 for name in del.iter() {
736 sql.push(format!("ALTER TABLE {} DROP COLUMN \"{}\";\r\n", options.table_name, name));
737 }
738
739 for name in put.iter() {
740 let name = name.to_string();
741 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
742
743 let comment_parts: Vec<&str> = row.split(" comment ").collect();
745 let field_type_part = if comment_parts.len() > 1 {
746 comment_parts[0].trim()
747 } else {
748 row.trim()
749 };
750
751 let type_def = if let Some(space_pos) = field_type_part.find(' ') {
754 &field_type_part[space_pos + 1..]
755 } else {
756 field_type_part
757 };
758
759 sql.push(format!("ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
761 options.table_name, name, type_def));
762
763 if comment_parts.len() > 1 {
765 let comment = comment_parts[1].trim_start_matches("'").trim_end_matches("'");
766 sql.push(format!(
767 "COMMENT ON COLUMN {}.{} IS '{}';\r\n",
768 options.table_name, name, comment
769 ));
770 }
771 }
772
773 let mut unique_fields = String::new();
775 let mut unique_name = String::new();
776 for item in options.table_unique.iter() {
777 if unique_fields.is_empty() {
778 unique_fields = format!("\"{}\"", item);
779 unique_name = format!("{}_unique_{}", options.table_name, item);
780 } else {
781 unique_fields = format!("{}, \"{}\"", unique_fields, item);
782 unique_name = format!("{}_{}", unique_name, item);
783 }
784 }
785
786 if !unique_name.is_empty() {
787 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
788 unique_name = generate_index_name(&format!("unique_{}", md5));
789
790 let (_, index_list) = self.query(format!(
792 "SELECT indexname FROM pg_indexes WHERE tablename = '{}' AND indexdef LIKE '%UNIQUE%'",
793 options.table_name
794 ).as_str());
795
796 let mut unique_new = vec![];
797 for item in index_list.members() {
798 if let Some(index_name) = item["indexname"].as_str() {
799 unique_new.push(index_name.to_string());
800 }
801 }
802
803 for item in &unique_new {
805 if unique_name != *item {
806 sql.push(format!("DROP INDEX IF EXISTS {};\r\n", item));
807 }
808 }
809
810 if !unique_new.contains(&unique_name) {
812 sql.push(format!(
813 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});\r\n",
814 unique_name, options.table_name, unique_fields
815 ));
816 }
817 }
818
819 let mut index_list = vec![];
821 for row in options.table_index.iter() {
822 let mut index_fields = String::new();
823 let mut index_name = String::new();
824 for item in row {
825 if index_fields.is_empty() {
826 index_fields = format!("\"{}\"", item);
827 index_name = format!("{}_index_{}", options.table_name, item);
828 } else {
829 index_fields = format!("{}, \"{}\"", index_fields, item);
830 index_name = format!("{}_{}", index_name, item);
831 }
832 }
833 let md5 = br_crypto::md5::encrypt_hex(index_name.as_bytes());
834 index_name = generate_index_name(&format!("index_{}", md5));
835 index_list.push(index_name.clone());
836
837 let (_, existing_indexes) = self.query(format!(
839 "SELECT indexname FROM pg_indexes WHERE tablename = '{}'",
840 options.table_name
841 ).as_str());
842
843 let mut existing_index_names = vec![];
844 for item in existing_indexes.members() {
845 if let Some(index_name) = item["indexname"].as_str() {
846 existing_index_names.push(index_name.to_string());
847 }
848 }
849
850 if !existing_index_names.contains(&index_name) {
851 sql.push(format!(
852 "CREATE INDEX IF NOT EXISTS {} ON {} ({});\r\n",
853 index_name, options.table_name, index_fields
854 ));
855 }
856 }
857
858 let (_, all_indexes) = self.query(format!(
860 "SELECT indexname FROM pg_indexes WHERE tablename = '{}'",
861 options.table_name
862 ).as_str());
863
864 for item in all_indexes.members() {
865 if let Some(index_name) = item["indexname"].as_str() {
866 if !index_name.contains("_pkey") && !index_list.contains(&index_name.to_string()) {
868 if index_name.starts_with(&format!("{}_", options.table_name)) {
870 sql.push(format!("DROP INDEX IF EXISTS {};\r\n", index_name));
871 }
872 }
873 }
874 }
875
876 if self.params.sql {
877 return JsonValue::from(sql.join(""));
878 }
879
880 if sql.is_empty() {
882 info!("数据库更新情况: {} 成功 更新前检查字段是否有变化,没有变化的不需要更新", options.table_name);
883 return JsonValue::from(-1);
884 }
885
886 for item in sql.iter() {
888 let (state, res) = self.execute(item.as_str());
889 match state {
890 true => {}
891 false => {
892 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
893 return JsonValue::from(0);
894 }
895 }
896 }
897
898 JsonValue::from(1)
899 }
900
901 fn table_info(&mut self, table: &str) -> JsonValue {
902 let cache_key = format!("{}{}", self.default, table);
903 {
904 let fields = TABLE_FIELDS.lock().unwrap();
905 if let Some(cached) = fields.get(&cache_key) {
906 return cached.clone();
907 }
908 }
909
910 let sql = format!(
912 "SELECT
913 COL.COLUMN_NAME,
914 COL.DATA_TYPE,
915 COL.UDT_NAME,
916 COL.CHARACTER_MAXIMUM_LENGTH,
917 COL.NUMERIC_PRECISION,
918 COL.NUMERIC_SCALE,
919 COL.COLUMN_DEFAULT,
920 COL.IS_NULLABLE,
921 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT
922 FROM INFORMATION_SCHEMA.COLUMNS COL
923 LEFT JOIN pg_catalog.pg_description DESCRIPTION
924 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
925 AND DESCRIPTION.objoid = (
926 SELECT oid FROM pg_catalog.pg_class
927 WHERE relname = COL.TABLE_NAME
928 AND relnamespace = (SELECT oid FROM pg_catalog.pg_namespace WHERE nspname = 'public')
929 LIMIT 1
930 )
931 WHERE COL.TABLE_SCHEMA = 'public'
932 AND COL.TABLE_NAME = '{}'",
933 table
934 );
935
936 let (state, data) = self.query(sql.as_str());
937 let mut list = object! {};
938
939 if state {
940 for item in data.members() {
941 if let Some(field_name) = item["COLUMN_NAME"].as_str() {
942 let mut row = object! {};
943 row["field"] = JsonValue::from(field_name);
944 row["type"] = item["DATA_TYPE"].clone();
945 row["udt_name"] = item["UDT_NAME"].clone();
946 row["comment"] = item["COMMENT"].clone();
947 row["is_nullable"] = item["IS_NULLABLE"].clone();
948 row["column_default"] = item["COLUMN_DEFAULT"].clone();
949 row["character_maximum_length"] = item["CHARACTER_MAXIMUM_LENGTH"].clone();
950 row["numeric_precision"] = item["NUMERIC_PRECISION"].clone();
951 row["numeric_scale"] = item["NUMERIC_SCALE"].clone();
952 list[field_name] = row;
953 }
954 }
955 let list_clone = list.clone();
956 TABLE_FIELDS.lock().unwrap().insert(cache_key, list_clone);
957 }
958 list
959 }
960
961 fn table_is_exist(&mut self, name: &str) -> bool {
962 let sql = format!(
963 "SELECT COUNT(*) as count FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{}'",
964 name
965 );
966 let (state, data) = self.query(sql.as_str());
967 match state {
968 true => {
969 if !data.is_empty() {
970 if let Some(count) = data[0]["count"].as_i64() {
971 return count > 0;
972 }
973 }
974 false
975 }
976 false => false,
977 }
978 }
979
980 fn table(&mut self, _name: &str) -> &mut Pgsql {
981 self
982 }
983
984 fn change_table(&mut self, _name: &str) -> &mut Self {
985 self
986 }
987
988 fn autoinc(&mut self) -> &mut Self {
989 self
990 }
991
992 fn fetch_sql(&mut self) -> &mut Self {
993 self
994 }
995
996 fn order(&mut self, _field: &str, _by: bool) -> &mut Self {
997 self
998 }
999
1000 fn group(&mut self, _field: &str) -> &mut Self {
1001 self
1002 }
1003
1004 fn distinct(&mut self) -> &mut Self {
1005 self
1006 }
1007
1008 fn json(&mut self, _field: &str) -> &mut Self {
1009 self
1010 }
1011
1012 fn location(&mut self, _field: &str) -> &mut Self {
1013 self
1014 }
1015
1016 fn field(&mut self, _field: &str) -> &mut Self {
1017 self
1018 }
1019
1020 fn hidden(&mut self, _name: &str) -> &mut Self {
1021 self
1022 }
1023
1024 fn where_and(&mut self, _field: &str, _compare: &str, _value: JsonValue) -> &mut Self {
1025 self
1026 }
1027
1028 fn where_or(&mut self, _field: &str, _compare: &str, _value: JsonValue) -> &mut Self {
1029 self
1030 }
1031
1032 fn where_column(&mut self, _field_a: &str, _compare: &str, _field_b: &str) -> &mut Self {
1033 self
1034 }
1035
1036 fn update_column(&mut self, _field_a: &str, _compare: &str) -> &mut Self {
1037 self
1038 }
1039
1040 fn page(&mut self, _page: i32, _limit: i32) -> &mut Self {
1041 self
1042 }
1043
1044 fn column(&mut self, _field: &str) -> JsonValue {
1045 JsonValue::Null
1046 }
1047
1048 fn count(&mut self) -> JsonValue {
1049 JsonValue::from(0)
1050 }
1051
1052 fn max(&mut self, _field: &str) -> JsonValue {
1053 JsonValue::from(0)
1054 }
1055
1056 fn min(&mut self, _field: &str) -> JsonValue {
1057 JsonValue::from(0)
1058 }
1059
1060 fn sum(&mut self, _field: &str) -> JsonValue {
1061 JsonValue::from(0)
1062 }
1063
1064 fn avg(&mut self, _field: &str) -> JsonValue {
1065 JsonValue::from(0)
1066 }
1067
1068 fn select(&mut self) -> JsonValue {
1069 array![]
1070 }
1071
1072 fn find(&mut self) -> JsonValue {
1073 object! {}
1074 }
1075
1076 fn value(&mut self, _field: &str) -> JsonValue {
1077 JsonValue::Null
1078 }
1079
1080 fn insert(&mut self, _data: JsonValue) -> JsonValue {
1081 JsonValue::from("")
1082 }
1083
1084 fn insert_all(&mut self, _data: JsonValue) -> JsonValue {
1085 array![]
1086 }
1087
1088 fn update(&mut self, _data: JsonValue) -> JsonValue {
1089 JsonValue::from(0)
1090 }
1091
1092 fn update_all(&mut self, _data: JsonValue) -> JsonValue {
1093 JsonValue::from(0)
1094 }
1095
1096 fn delete(&mut self) -> JsonValue {
1097 JsonValue::from(0)
1098 }
1099
1100 fn inc(&mut self, _field: &str, _num: f64) -> &mut Self {
1101 self
1102 }
1103
1104 fn dec(&mut self, _field: &str, _num: f64) -> &mut Self {
1105 self
1106 }
1107
1108 fn buildsql(&mut self) -> String {
1109 String::new()
1110 }
1111
1112 fn join_fields(&mut self, _fields: Vec<&str>) -> &mut Self {
1113 self
1114 }
1115
1116 fn join(&mut self, _main_table: &str, _main_fields: &str, _right_table: &str, _right_fields: &str) -> &mut Self {
1117 self
1118 }
1119
1120 fn join_inner(&mut self, _table: &str, _main_fields: &str, _second_fields: &str) -> &mut Self {
1121 self
1122 }
1123}