1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use crate::TABLE_FIELDS;
6use br_pgsql::pools::Pools;
7use br_pgsql::PgsqlError;
8use chrono::Local;
9use json::{array, object, JsonValue};
10use log::{error, info, warn};
11use std::thread;
12#[derive(Clone)]
13pub struct Pgsql {
14 pub connection: Connection,
16 pub default: String,
18 pub params: Params,
19 pub client: Pools,
20}
21
22impl Pgsql {
23 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
24 let port = connection
25 .hostport
26 .parse::<i32>()
27 .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
28
29 let cp_connection = connection.clone();
30 let config = object! {
31 debug: cp_connection.debug,
32 username: cp_connection.username,
33 userpass: cp_connection.userpass,
34 database: cp_connection.database,
35 hostname: cp_connection.hostname,
36 hostport: port,
37 charset: cp_connection.charset.str(),
38 pool_max: cp_connection.pool.max_connections,
39 };
40 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
41
42 let pools = pgsql.pools()?;
43 Ok(Self {
44 connection,
45 default: default.clone(),
46 params: Params::default("pgsql"),
47 client: pools,
48 })
49 }
50
51 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
52 let thread_id = format!("{:?}", thread::current().id());
53 let key = format!("{}{}", self.default, thread_id);
54
55 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
56 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
57
58 match result {
59 Some(Ok(e)) => {
60 if self.connection.debug {
61 info!("查询成功: {} {}", thread_id.clone(), sql);
62 }
63 (true, e.rows)
64 }
65 Some(Err(e)) => {
66 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
67 (false, JsonValue::from(e.to_string()))
68 }
69 None => {
70 error!("事务查询失败: 未找到事务连接 {thread_id}");
71 (false, JsonValue::from("未找到事务连接"))
72 }
73 }
74 } else {
75 let mut guard = match self.client.get_guard() {
76 Ok(g) => g,
77 Err(e) => {
78 error!(
80 "非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
81 );
82 return (false, JsonValue::from(e.to_string()));
83 }
84 };
85 match guard.conn().query(sql) {
86 Ok(e) => {
87 if self.connection.debug {
88 info!("查询成功: {} {}", thread_id.clone(), sql);
89 }
90 (true, e.rows)
91 }
92 Err(ref e) if Self::is_retriable_error(e) => {
93 guard.discard();
96 self.client.flush_idle();
98 warn!("非事务查询连接断开(重试一次): {thread_id} {e}");
99 thread::sleep(std::time::Duration::from_millis(200));
100 let mut guard2 = match self.client.get_guard() {
101 Ok(g) => g,
102 Err(e) => {
103 error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
104 return (false, JsonValue::from(e.to_string()));
105 }
106 };
107 match guard2.conn().query(sql) {
108 Ok(e) => {
109 if self.connection.debug {
110 info!("查询成功(重试): {} {}", thread_id.clone(), sql);
111 }
112 (true, e.rows)
113 }
114 Err(e) => {
115 error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
116 (false, JsonValue::from(e.to_string()))
117 }
118 }
119 }
120 Err(e) => {
121 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
122 (false, JsonValue::from(e.to_string()))
123 }
124 }
125 }
126 }
127 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
128 let thread_id = format!("{:?}", thread::current().id());
129 let key = format!("{}{}", self.default, thread_id);
130
131 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
132 if !PGSQL_TRANSACTION_MANAGER.acquire_table_lock(
134 &self.params.table,
135 &thread_id,
136 std::time::Duration::from_secs(30),
137 ) {
138 error!("获取表锁超时: {} {}", self.params.table, thread_id);
139 return (false, JsonValue::from("table lock timeout"));
140 }
141 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
142
143 match result {
144 Some(Ok(e)) => {
145 if self.connection.debug {
146 info!("提交成功: {} {}", thread_id.clone(), sql);
147 }
148 if sql.contains("INSERT") {
149 (true, e.rows)
150 } else {
151 (true, e.affect_count.into())
152 }
153 }
154 Some(Err(e)) => {
155 error!("事务提交失败: {thread_id} {e}");
156 (false, JsonValue::from(e.to_string()))
157 }
158 None => {
159 error!("事务执行失败: 未找到事务连接 {thread_id}");
160 (false, JsonValue::from("未找到事务连接"))
161 }
162 }
163 } else {
164 let mut guard = match self.client.get_guard() {
166 Ok(g) => g,
167 Err(e) => {
168 error!(
169 "非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
170 );
171 return (false, JsonValue::from(e.to_string()));
172 }
173 };
174 match guard.conn().execute(sql) {
175 Ok(e) => {
176 if self.connection.debug {
177 info!("提交成功: {} {}", thread_id.clone(), sql);
178 }
179 if sql.contains("INSERT") {
180 (true, e.rows)
181 } else {
182 (true, e.affect_count.into())
183 }
184 }
185 Err(ref e) if Self::is_retriable_error(e) => {
186 guard.discard();
189 self.client.flush_idle();
191 warn!("非事务执行连接断开(重试一次): {thread_id} {e}");
192 thread::sleep(std::time::Duration::from_millis(200));
193 let mut guard2 = match self.client.get_guard() {
194 Ok(g) => g,
195 Err(e) => {
196 error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
197 return (false, JsonValue::from(e.to_string()));
198 }
199 };
200 match guard2.conn().execute(sql) {
201 Ok(e) => {
202 if self.connection.debug {
203 info!("提交成功(重试): {} {}", thread_id.clone(), sql);
204 }
205 if sql.contains("INSERT") {
206 (true, e.rows)
207 } else {
208 (true, e.affect_count.into())
209 }
210 }
211 Err(e) => {
212 error!("非事务执行重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
213 (false, JsonValue::from(e.to_string()))
214 }
215 }
216 }
217 Err(e) => {
218 error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
219 (false, JsonValue::from(e.to_string()))
220 }
221 }
222 }
223 }
224
225 fn is_retriable_error(e: &PgsqlError) -> bool {
227 matches!(
228 e,
229 PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
230 )
231 }
232}
233
234impl DbMode for Pgsql {
235 fn database_tables(&mut self) -> JsonValue {
236 let sql = "SHOW TABLES".to_string();
237 match self.sql(sql.as_str()) {
238 Ok(e) => {
239 let mut list = vec![];
240 for item in e.members() {
241 for (_, value) in item.entries() {
242 list.push(value.clone());
243 }
244 }
245 list.into()
246 }
247 Err(_) => {
248 array![]
249 }
250 }
251 }
252
253 fn database_create(&mut self, name: &str) -> bool {
254 let sql = format!("CREATE DATABASE {name}");
255
256 let (state, data) = self.execute(sql.as_str());
257 match state {
258 true => data.as_bool().unwrap_or(true),
259 false => {
260 error!("创建数据库失败: {data:?}");
261 false
262 }
263 }
264 }
265
266 fn truncate(&mut self, table: &str) -> bool {
267 let sql = format!("TRUNCATE TABLE {table}");
268 let (state, _) = self.execute(sql.as_str());
269 state
270 }
271}
272
273impl Mode for Pgsql {
274 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
275 let mut sql = String::new();
276 let mut comments = vec![];
277
278 if !options.table_unique.is_empty() {
279 let full_name = format!(
280 "{}_unique_{}",
281 options.table_name,
282 options.table_unique.join("_")
283 );
284 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
285 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
286 let unique = format!(
287 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
288 name,
289 options.table_name,
290 options.table_unique.join(",")
291 );
292 comments.push(unique);
293 }
294
295 for row in options.table_index.iter() {
296 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
297 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
298 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
299 let index = format!(
300 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
301 name,
302 options.table_name,
303 row.join(",")
304 );
305 comments.push(index);
306 }
307
308 for (name, field) in options.table_fields.entries_mut() {
309 field["table_name"] = options.table_name.clone().into();
310 let row = br_fields::field("pgsql", name, field.clone());
311 let (col_sql, meta) = if let Some(idx) = row.find("--") {
312 (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
313 } else {
314 (row.trim(), None)
315 };
316 if let Some(meta) = meta {
317 comments.push(format!(
318 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
319 options.table_name, name, meta
320 ));
321 }
322 sql = format!("{} {},\r\n", sql, col_sql);
323 }
324
325 let primary_key = format!(
326 "CONSTRAINT {}_{} PRIMARY KEY ({})",
327 options.table_name, options.table_key, options.table_key
328 );
329 let sql = format!(
330 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
331 options.table_name,
332 sql.trim_end_matches(",\r\n"),
333 primary_key
334 );
335 comments.insert(0, sql);
336
337 for (_name, field) in options.table_fields.entries() {
338 let _ = field["mode"].as_str();
339 }
340
341 if self.params.sql {
342 let info = comments.join("\r\n");
343 return JsonValue::from(info);
344 }
345 for comment in comments {
346 let (state, _) = self.execute(comment.as_str());
347 match state {
348 true => {}
349 false => {
350 return JsonValue::from(state);
351 }
352 }
353 }
354 JsonValue::from(true)
355 }
356
357 fn table_update(&mut self, options: TableOptions) -> JsonValue {
358 let cache_key = format!("{}{}", self.default, options.table_name);
360 let table_fields_guard = match TABLE_FIELDS.read() {
361 Ok(g) => g,
362 Err(e) => e.into_inner(),
363 };
364 if table_fields_guard.get(&cache_key).is_some() {
365 drop(table_fields_guard);
366 let mut table_fields_guard = match TABLE_FIELDS.write() {
367 Ok(g) => g,
368 Err(e) => e.into_inner(),
369 };
370 table_fields_guard.remove(&cache_key);
371 } else {
372 drop(table_fields_guard);
373 }
374 let fields_list = self.table_info(&options.table_name);
375 let mut put = vec![];
376 let mut add = vec![];
377 let mut del = vec![];
378 let mut comments = vec![];
379
380 for (key, _) in fields_list.entries() {
381 if options.table_fields[key].is_empty() {
382 del.push(key);
383 }
384 }
385 for (name, field) in options.table_fields.entries() {
386 if !fields_list[name].is_empty() {
387 let old_info = &fields_list[name];
388 let new_field_sql = br_fields::field("pgsql", name, field.clone());
389
390 let old_comment = old_info["comment"].as_str().unwrap_or("");
391 let old_type = old_info["type"].as_str().unwrap_or("");
392
393 let new_comment = if let Some(idx) = new_field_sql.find("--") {
394 new_field_sql[idx + 2..].trim()
395 } else {
396 ""
397 };
398
399 let comment_matches =
400 if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
401 let old_parts: Vec<&str> = old_comment.split('|').collect();
402 let new_parts: Vec<&str> = new_comment.split('|').collect();
403 if old_parts.len() >= 4 && new_parts.len() >= 4 {
404 old_parts[..4] == new_parts[..4]
405 } else {
406 old_comment == new_comment
407 }
408 } else if !old_comment.is_empty() && !new_comment.is_empty() {
409 let old_parts: Vec<&str> = old_comment.split('|').collect();
410 let new_parts: Vec<&str> = new_comment.split('|').collect();
411 if old_parts.len() >= 2
412 && new_parts.len() >= 2
413 && old_parts.len() == new_parts.len()
414 {
415 let old_filtered: Vec<&str> = old_parts
416 .iter()
417 .filter(|v| **v != "true" && **v != "false")
418 .copied()
419 .collect();
420 let new_filtered: Vec<&str> = new_parts
421 .iter()
422 .filter(|v| **v != "true" && **v != "false")
423 .copied()
424 .collect();
425 old_filtered == new_filtered
426 } else {
427 old_comment == new_comment
428 }
429 } else {
430 old_comment == new_comment
431 };
432
433 let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
434 let new_type = if sql_parts.len() > 1 {
435 sql_parts[1].to_lowercase()
436 } else {
437 String::new()
438 };
439
440 let type_matches = match old_type {
441 "integer" => {
442 new_type.contains("int")
443 && !new_type.contains("bigint")
444 && !new_type.contains("smallint")
445 }
446 "bigint" => new_type.contains("bigint"),
447 "smallint" => new_type.contains("smallint"),
448 "boolean" => new_type.contains("boolean"),
449 "text" => new_type.contains("text"),
450 "character varying" => {
451 if !new_type.contains("varchar") {
452 false
453 } else {
454 let old_len = old_info["max_length"].as_i64().unwrap_or(0);
455 let new_len = new_type
456 .trim_start_matches("varchar(")
457 .trim_end_matches(')')
458 .parse::<i64>()
459 .unwrap_or(0);
460 let matched = old_len == new_len || new_len == 0;
461 if !matched {
462 log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
463 }
464 old_len == new_len || new_len == 0
465 }
466 }
467 "character" => new_type.contains("char") && !new_type.contains("varchar"),
468 "numeric" => {
469 if !(new_type.contains("numeric") || new_type.contains("decimal")) {
470 false
471 } else {
472 let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
473 let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
474 let inner = new_type
475 .replace("numeric(", "")
476 .replace("decimal(", "")
477 .replace(')', "");
478 let parts: Vec<&str> = inner.split(',').collect();
479 let new_prec = parts
480 .first()
481 .and_then(|s| s.trim().parse::<i64>().ok())
482 .unwrap_or(0);
483 let new_scale = parts
484 .get(1)
485 .and_then(|s| s.trim().parse::<i64>().ok())
486 .unwrap_or(0);
487 old_prec == new_prec && old_scale == new_scale
488 }
489 }
490 "double precision" => {
491 new_type.contains("double") || new_type.contains("float8")
492 }
493 "real" => new_type.contains("real") || new_type.contains("float4"),
494 "timestamp without time zone" | "timestamp with time zone" => {
495 new_type.contains("timestamp")
496 }
497 "date" => new_type.contains("date") && !new_type.contains("timestamp"),
498 "time without time zone" | "time with time zone" => {
499 new_type.contains("time") && !new_type.contains("timestamp")
500 }
501 "json" | "jsonb" => new_type.contains("json"),
502 "uuid" => new_type.contains("uuid"),
503 "bytea" => new_type.contains("bytea"),
504 _ => old_type == new_type,
505 };
506
507 if type_matches && comment_matches {
508 continue;
509 }
510
511 log::debug!(
512 "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
513 options.table_name,
514 name,
515 type_matches,
516 old_type,
517 new_type,
518 comment_matches
519 );
520 put.push(name);
521 } else {
522 add.push(name);
523 }
524 }
525
526 for name in add.iter() {
527 let name = name.to_string();
528 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
529 let rows = row.split("--").collect::<Vec<&str>>();
530 comments.push(format!(
531 r#"ALTER TABLE "{}" add {};"#,
532 options.table_name,
533 rows[0].trim()
534 ));
535 if rows.len() > 1 {
536 comments.push(format!(
537 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
538 options.table_name,
539 name,
540 rows[1].trim()
541 ));
542 }
543 }
544 for name in del.iter() {
545 comments.push(format!(
546 "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
547 options.table_name, name
548 ));
549 }
550 for name in put.iter() {
551 let name = name.to_string();
552 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
553 let rows = row.split("--").collect::<Vec<&str>>();
554
555 let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
556
557 if sql[1].contains("BOOLEAN") {
558 let text = format!(
559 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
560 options.table_name, name
561 );
562 comments.push(text.clone());
563 let text = format!(
564 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
565 options.table_name, name, sql[1]
566 );
567 comments.push(text.clone());
568 } else {
569 let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
570 let new_type_lower = sql[1].to_lowercase();
571 let is_date_to_numeric = (old_col_type == "date"
572 || old_col_type.contains("timestamp"))
573 && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
574 if is_date_to_numeric {
575 comments.push(format!(
576 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
577 options.table_name, name
578 ));
579 comments.push(format!(
580 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
581 options.table_name, name, sql[1], name, name, name
582 ));
583 } else {
584 let text = format!(
585 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
586 options.table_name, name, sql[1]
587 );
588 comments.push(text.clone());
589 }
590 };
591
592 if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
593 let default_value = rows[0][default_pos + 9..].trim();
594 if !default_value.is_empty() {
595 comments.push(format!(
596 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
597 options.table_name, name, default_value
598 ));
599 }
600 }
601 let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
604 .as_str()
605 .unwrap_or("YES");
606 let old_is_required = old_is_nullable == "NO";
607
608 if old_is_required && name != options.table_key {
610 comments.push(format!(
611 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
612 options.table_name, name
613 ));
614 }
615
616 if rows.len() > 1 {
617 comments.push(format!(
618 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
619 options.table_name,
620 name,
621 rows[1].trim()
622 ));
623 }
624 }
625
626 let mut unique_new = vec![];
627 let mut index_new = vec![];
628 let mut primary_key = vec![];
629 let (_, index_list) = self.query(
630 format!(
631 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
632 options.table_name
633 )
634 .as_str(),
635 );
636 for item in index_list.members() {
637 let key_name = item["indexname"].as_str().unwrap_or("");
638 let indexdef = item["indexdef"].to_string();
639
640 if indexdef.contains(
641 format!(
642 "CREATE UNIQUE INDEX {}_{} ON",
643 options.table_name, options.table_key
644 )
645 .as_str(),
646 ) {
647 primary_key.push(key_name.to_string());
648 continue;
649 }
650 if indexdef.contains("CREATE UNIQUE INDEX") {
651 unique_new.push(key_name.to_string());
652 continue;
653 }
654 if indexdef.contains("CREATE INDEX") {
655 index_new.push(key_name.to_string());
656 continue;
657 }
658 }
659
660 if !options.table_unique.is_empty() {
661 let full_name = format!(
662 "{}_unique_{}",
663 options.table_name,
664 options.table_unique.join("_")
665 );
666 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
667 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
668 let unique = format!(
669 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
670 name,
671 options.table_name,
672 options.table_unique.join(",")
673 );
674 if !unique_new.contains(&name) {
675 comments.push(unique);
676 }
677 unique_new.retain(|x| *x != name);
678 }
679
680 for row in options.table_index.iter() {
681 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
682 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
683 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
684 let index = format!(
685 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
686 name,
687 options.table_name,
688 row.join(",")
689 );
690 if !index_new.contains(&name) {
691 comments.push(index);
692 }
693 index_new.retain(|x| *x != name);
694 }
695
696 for item in unique_new {
697 if item.ends_with("_pkey") {
698 continue;
699 }
700 if item.starts_with("unique_") {
701 comments.push(format!(
702 "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
703 options.table_name,
704 item.clone()
705 ));
706 } else {
707 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
708 }
709 }
710 for item in index_new {
711 if item.ends_with("_pkey") {
712 continue;
713 }
714 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
715 }
716
717 if self.params.sql {
718 return JsonValue::from(comments.join(""));
719 }
720
721 if comments.is_empty() {
722 return JsonValue::from(-1);
723 }
724
725 for item in comments.iter() {
726 let (state, res) = self.execute(item.as_str());
727 match state {
728 true => {}
729 false => {
730 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
731 return JsonValue::from(0);
732 }
733 }
734 }
735 JsonValue::from(1)
736 }
737
738 fn table_info(&mut self, table: &str) -> JsonValue {
739 let cache_key = format!("{}{}", self.default, table);
741 let table_fields_guard = match TABLE_FIELDS.read() {
742 Ok(g) => g,
743 Err(e) => e.into_inner(),
744 };
745 if let Some(cached) = table_fields_guard.get(&cache_key) {
746 return cached.clone();
747 }
748 drop(table_fields_guard);
749 let sql = format!(
750 "SELECT COL.COLUMN_NAME,
751 COL.DATA_TYPE,
752 COL.IS_NULLABLE,
753 COL.CHARACTER_MAXIMUM_LENGTH,
754 COL.NUMERIC_PRECISION,
755 COL.NUMERIC_SCALE,
756 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
757 LEFT JOIN
758 pg_catalog.pg_description DESCRIPTION
759 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
760 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
761 let (state, data) = self.query(sql.as_str());
762 let mut list = object! {};
763 if state {
764 for item in data.members() {
765 let mut row = object! {};
766 row["field"] = item["column_name"].clone();
767 row["comment"] = item["comment"].clone();
768 row["type"] = item["data_type"].clone();
769 row["is_nullable"] = item["is_nullable"].clone();
770 row["max_length"] = item["character_maximum_length"].clone();
771 row["numeric_precision"] = item["numeric_precision"].clone();
772 row["numeric_scale"] = item["numeric_scale"].clone();
773 if let Some(field_name) = row["field"].as_str() {
774 list[field_name] = row.clone();
775 }
776 }
777 let mut table_fields_guard = match TABLE_FIELDS.write() {
779 Ok(g) => g,
780 Err(e) => e.into_inner(),
781 };
782 table_fields_guard.insert(cache_key, list.clone());
783 list
784 } else {
785 list
786 }
787 }
788
789 fn table_is_exist(&mut self, name: &str) -> bool {
790 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
791 let (state, data) = self.query(sql.as_str());
792 match state {
793 true => {
794 for item in data.members() {
795 if item.has_key("exists") {
796 return item["exists"].as_bool().unwrap_or(false);
797 }
798 }
799 false
800 }
801 false => false,
802 }
803 }
804
805 fn table(&mut self, name: &str) -> &mut Pgsql {
806 self.params = Params::default(self.connection.mode.str().as_str());
807 let table_name = format!("{}{}", self.connection.prefix, name);
808 if !super::sql_safety::validate_table_name(&table_name) {
809 error!("Invalid table name: {}", name);
810 }
811 self.params.table = table_name.clone();
812 self.params.join_table = table_name;
813 self
814 }
815
816 fn change_table(&mut self, name: &str) -> &mut Self {
817 self.params.join_table = name.to_string();
818 self
819 }
820
821 fn autoinc(&mut self) -> &mut Self {
822 self.params.autoinc = true;
823 self
824 }
825
826 fn timestamps(&mut self) -> &mut Self {
827 self.params.timestamps = true;
828 self
829 }
830
831 fn fetch_sql(&mut self) -> &mut Self {
832 self.params.sql = true;
833 self
834 }
835
836 fn order(&mut self, field: &str, by: bool) -> &mut Self {
837 self.params.order[field] = {
838 if by {
839 "DESC"
840 } else {
841 "ASC"
842 }
843 }
844 .into();
845 self
846 }
847
848 fn group(&mut self, field: &str) -> &mut Self {
849 let fields: Vec<&str> = field.split(",").collect();
850 for field in fields.iter() {
851 let field = field.to_string();
852 self.params.group[field.as_str()] = field.clone().into();
853 self.params.fields[field.as_str()] = field.clone().into();
854 }
855 self
856 }
857
858 fn distinct(&mut self) -> &mut Self {
859 self.params.distinct = true;
860 self
861 }
862
863 fn json(&mut self, field: &str) -> &mut Self {
864 let list: Vec<&str> = field.split(",").collect();
865 for item in list.iter() {
866 self.params.json[item.to_string().as_str()] = item.to_string().into();
867 }
868 self
869 }
870
871 fn location(&mut self, field: &str) -> &mut Self {
872 let list: Vec<&str> = field.split(",").collect();
873 for item in list.iter() {
874 self.params.location[item.to_string().as_str()] = item.to_string().into();
875 }
876 self
877 }
878
879 fn field(&mut self, field: &str) -> &mut Self {
880 let list: Vec<&str> = field.split(",").collect();
881 let join_table = if self.params.join_table.is_empty() {
882 self.params.table.clone()
883 } else {
884 self.params.join_table.clone()
885 };
886 for item in list.iter() {
887 let lower = item.to_lowercase();
888 let is_expr = lower.contains("count(")
889 || lower.contains("sum(")
890 || lower.contains("avg(")
891 || lower.contains("max(")
892 || lower.contains("min(")
893 || lower.contains("case ");
894 if is_expr {
895 self.params.fields[item.to_string().as_str()] = (*item).into();
896 } else if item.contains(" as ") {
897 let text = item.split(" as ").collect::<Vec<&str>>();
898 self.params.fields[item.to_string().as_str()] =
899 format!("{}.{} as {}", join_table, text[0], text[1]).into();
900 } else {
901 self.params.fields[item.to_string().as_str()] =
902 format!("{join_table}.{item}").into();
903 }
904 }
905 self
906 }
907
908 fn field_raw(&mut self, expr: &str) -> &mut Self {
909 self.params.fields[expr] = expr.into();
910 self
911 }
912
913 fn hidden(&mut self, name: &str) -> &mut Self {
914 let hidden: Vec<&str> = name.split(",").collect();
915
916 let fields_list = self.table_info(self.params.clone().table.as_str());
917 let mut data = array![];
918 for item in fields_list.members() {
919 let _ = data.push(object! {
920 "name":item["field"].as_str().unwrap_or("")
921 });
922 }
923
924 for item in data.members() {
925 let name = item["name"].as_str().unwrap_or("");
926 if !hidden.contains(&name) {
927 self.params.fields[name] = name.into();
928 }
929 }
930 self
931 }
932
933 fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
934 for f in field.split('|') {
935 if !super::sql_safety::validate_field_name(f) {
936 error!("Invalid field name: {}", f);
937 }
938 }
939 if !super::sql_safety::validate_compare_orator(compare) {
940 error!("Invalid compare operator: {}", compare);
941 }
942 let join_table = if self.params.join_table.is_empty() {
943 self.params.table.clone()
944 } else {
945 self.params.join_table.clone()
946 };
947 if value.is_boolean() {
948 let bool_val = value.as_bool().unwrap_or(false);
949 self.params
950 .where_and
951 .push(format!("{join_table}.{field} {compare} {bool_val}"));
952 return self;
953 }
954 match compare {
955 "between" => {
956 self.params.where_and.push(format!(
957 "{}.{} between '{}' AND '{}'",
958 join_table, field, value[0], value[1]
959 ));
960 }
961 "set" => {
962 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
963 let mut wheredata = vec![];
964 for item in list.iter() {
965 wheredata.push(format!(
966 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
967 ));
968 }
969 self.params
970 .where_and
971 .push(format!("({})", wheredata.join(" or ")));
972 }
973 "notin" => {
974 let mut text = String::new();
975 for item in value.members() {
976 text = format!("{text},'{item}'");
977 }
978 text = text.trim_start_matches(",").into();
979 self.params
980 .where_and
981 .push(format!("{join_table}.{field} not in ({text})"));
982 }
983 "is" => {
984 self.params
985 .where_and
986 .push(format!("{join_table}.{field} is {value}"));
987 }
988 "isnot" => {
989 self.params
990 .where_and
991 .push(format!("{join_table}.{field} is not {value}"));
992 }
993 "notlike" => {
994 self.params
995 .where_and
996 .push(format!("{join_table}.{field} not like '{value}'"));
997 }
998 "in" => {
999 if value.is_array() && value.is_empty() {
1000 self.params.where_and.push("1=0".to_string());
1001 return self;
1002 }
1003 let mut text = String::new();
1004 if value.is_array() {
1005 for item in value.members() {
1006 text = format!("{text},'{item}'");
1007 }
1008 } else if value.is_null() {
1009 text = format!("{text},null");
1010 } else {
1011 let value = value.as_str().unwrap_or("");
1012
1013 let value: Vec<&str> = value.split(",").collect();
1014 for item in value.iter() {
1015 text = format!("{text},'{item}'");
1016 }
1017 }
1018 text = text.trim_start_matches(",").into();
1019
1020 self.params
1021 .where_and
1022 .push(format!("{join_table}.{field} {compare} ({text})"));
1023 }
1024 "json_contains" => {
1028 if value.is_array() {
1029 if value.is_empty() {
1030 self.params.where_and.push("1=0".to_string());
1031 } else {
1032 let mut parts = vec![];
1033 for item in value.members() {
1034 let escaped = super::sql_safety::escape_string(&item.to_string());
1035 parts.push(format!(
1036 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1037 escaped
1038 ));
1039 }
1040 self.params
1041 .where_and
1042 .push(format!("({})", parts.join(" OR ")));
1043 }
1044 } else {
1045 let escaped = super::sql_safety::escape_string(&value.to_string());
1046 self.params.where_and.push(format!(
1047 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1048 escaped
1049 ));
1050 }
1051 }
1052 _ => {
1053 self.params
1054 .where_and
1055 .push(format!("{join_table}.{field} {compare} '{value}'"));
1056 }
1057 }
1058 self
1059 }
1060
1061 fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1062 for f in field.split('|') {
1063 if !super::sql_safety::validate_field_name(f) {
1064 error!("Invalid field name: {}", f);
1065 }
1066 }
1067 if !super::sql_safety::validate_compare_orator(compare) {
1068 error!("Invalid compare operator: {}", compare);
1069 }
1070 let join_table = if self.params.join_table.is_empty() {
1071 self.params.table.clone()
1072 } else {
1073 self.params.join_table.clone()
1074 };
1075
1076 if value.is_boolean() {
1077 let bool_val = value.as_bool().unwrap_or(false);
1078 self.params
1079 .where_or
1080 .push(format!("{join_table}.{field} {compare} {bool_val}"));
1081 return self;
1082 }
1083
1084 match compare {
1085 "between" => {
1086 self.params.where_or.push(format!(
1087 "{}.{} between '{}' AND '{}'",
1088 join_table, field, value[0], value[1]
1089 ));
1090 }
1091 "set" => {
1092 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1093 let mut wheredata = vec![];
1094 for item in list.iter() {
1095 wheredata.push(format!(
1096 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1097 ));
1098 }
1099 self.params
1100 .where_or
1101 .push(format!("({})", wheredata.join(" or ")));
1102 }
1103 "notin" => {
1104 let mut text = String::new();
1105 for item in value.members() {
1106 text = format!("{text},'{item}'");
1107 }
1108 text = text.trim_start_matches(",").into();
1109 self.params
1110 .where_or
1111 .push(format!("{join_table}.{field} not in ({text})"));
1112 }
1113 "is" => {
1114 self.params
1115 .where_or
1116 .push(format!("{join_table}.{field} is {value}"));
1117 }
1118 "isnot" => {
1119 self.params
1120 .where_or
1121 .push(format!("{join_table}.{field} is not {value}"));
1122 }
1123 "in" => {
1124 if value.is_array() && value.is_empty() {
1125 self.params.where_or.push("1=0".to_string());
1126 return self;
1127 }
1128 let mut text = String::new();
1129 if value.is_array() {
1130 for item in value.members() {
1131 text = format!("{text},'{item}'");
1132 }
1133 } else {
1134 let value = value.as_str().unwrap_or("");
1135 let value: Vec<&str> = value.split(",").collect();
1136 for item in value.iter() {
1137 text = format!("{text},'{item}'");
1138 }
1139 }
1140 text = text.trim_start_matches(",").into();
1141 self.params
1142 .where_or
1143 .push(format!("{join_table}.{field} {compare} ({text})"));
1144 }
1145 "json_contains" => {
1149 if value.is_array() {
1150 if value.is_empty() {
1151 self.params.where_or.push("1=0".to_string());
1152 } else {
1153 let mut parts = vec![];
1154 for item in value.members() {
1155 let escaped = super::sql_safety::escape_string(&item.to_string());
1156 parts.push(format!(
1157 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1158 escaped
1159 ));
1160 }
1161 self.params
1162 .where_or
1163 .push(format!("({})", parts.join(" OR ")));
1164 }
1165 } else {
1166 let escaped = super::sql_safety::escape_string(&value.to_string());
1167 self.params.where_or.push(format!(
1168 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1169 escaped
1170 ));
1171 }
1172 }
1173 _ => {
1174 self.params
1175 .where_or
1176 .push(format!("{join_table}.{field} {compare} '{value}'"));
1177 }
1178 }
1179 self
1180 }
1181
1182 fn where_raw(&mut self, expr: &str) -> &mut Self {
1183 self.params.where_and.push(expr.to_string());
1184 self
1185 }
1186
1187 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1188 self.params
1189 .where_and
1190 .push(format!("\"{field}\" IN ({sub_sql})"));
1191 self
1192 }
1193
1194 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1195 self.params
1196 .where_and
1197 .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1198 self
1199 }
1200
1201 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1202 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1203 self
1204 }
1205
1206 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1207 self.params
1208 .where_and
1209 .push(format!("NOT EXISTS ({sub_sql})"));
1210 self
1211 }
1212
1213 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1214 self.params.where_column = format!(
1215 "{}.{} {} {}.{}",
1216 self.params.table, field_a, compare, self.params.table, field_b
1217 );
1218 self
1219 }
1220
1221 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1222 self.params
1223 .update_column
1224 .push(format!("{field_a} = {compare}"));
1225 self
1226 }
1227
1228 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1229 self.params.page = page;
1230 self.params.limit = limit;
1231 self
1232 }
1233
1234 fn limit(&mut self, count: i32) -> &mut Self {
1235 self.params.limit_only = count;
1236 self
1237 }
1238
1239 fn column(&mut self, field: &str) -> JsonValue {
1240 self.field(field);
1241 let sql = self.params.select_sql();
1242
1243 if self.params.sql {
1244 return JsonValue::from(sql);
1245 }
1246 let (state, data) = self.query(sql.as_str());
1247 match state {
1248 true => {
1249 let mut list = array![];
1250 for item in data.members() {
1251 if self.params.json[field].is_empty() {
1252 let _ = list.push(item[field].clone());
1253 } else {
1254 let data =
1255 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1256 let _ = list.push(data);
1257 }
1258 }
1259 list
1260 }
1261 false => {
1262 array![]
1263 }
1264 }
1265 }
1266
1267 fn count(&mut self) -> JsonValue {
1268 self.params.fields = json::object! {};
1269 self.params.fields["count"] = "count(*) as count".to_string().into();
1270 let sql = self.params.select_sql();
1271 if self.params.sql {
1272 return JsonValue::from(sql.clone());
1273 }
1274 let (state, data) = self.query(sql.as_str());
1275 if state {
1276 data[0]["count"].clone()
1277 } else {
1278 JsonValue::from(0)
1279 }
1280 }
1281
1282 fn max(&mut self, field: &str) -> JsonValue {
1283 self.params.fields[field] = format!("max({field}) as {field}").into();
1284 let sql = self.params.select_sql();
1285 if self.params.sql {
1286 return JsonValue::from(sql.clone());
1287 }
1288 let (state, data) = self.query(sql.as_str());
1289 if state {
1290 if data.len() > 1 {
1291 return data.clone();
1292 }
1293 data[0][field].clone()
1294 } else {
1295 JsonValue::from(0)
1296 }
1297 }
1298
1299 fn min(&mut self, field: &str) -> JsonValue {
1300 self.params.fields[field] = format!("min({field}) as {field}").into();
1301 let sql = self.params.select_sql();
1302 if self.params.sql {
1303 return JsonValue::from(sql.clone());
1304 }
1305 let (state, data) = self.query(sql.as_str());
1306 if state {
1307 if data.len() > 1 {
1308 return data;
1309 }
1310 data[0][field].clone()
1311 } else {
1312 JsonValue::from(0)
1313 }
1314 }
1315
1316 fn sum(&mut self, field: &str) -> JsonValue {
1317 self.params.fields[field] = format!("sum({field}) as {field}").into();
1318 let sql = self.params.select_sql();
1319 if self.params.sql {
1320 return JsonValue::from(sql.clone());
1321 }
1322 let (state, data) = self.query(sql.as_str());
1323 match state {
1324 true => {
1325 if data.len() > 1 {
1326 return data;
1327 }
1328 data[0][field].clone()
1329 }
1330 false => JsonValue::from(0),
1331 }
1332 }
1333
1334 fn avg(&mut self, field: &str) -> JsonValue {
1335 self.params.fields[field] = format!("avg({field}) as {field}").into();
1336 let sql = self.params.select_sql();
1337 if self.params.sql {
1338 return JsonValue::from(sql.clone());
1339 }
1340 let (state, data) = self.query(sql.as_str());
1341 if state {
1342 if data.len() > 1 {
1343 return data;
1344 }
1345 data[0][field].clone()
1346 } else {
1347 JsonValue::from(0)
1348 }
1349 }
1350
1351 fn having(&mut self, expr: &str) -> &mut Self {
1352 self.params.having.push(expr.to_string());
1353 self
1354 }
1355
1356 fn select(&mut self) -> JsonValue {
1357 let sql = self.params.select_sql();
1358 if self.params.sql {
1359 return JsonValue::from(sql.clone());
1360 }
1361 let (state, mut data) = self.query(sql.as_str());
1362 match state {
1363 true => {
1364 for (field, _) in self.params.json.entries() {
1365 for item in data.members_mut() {
1366 if !item[field].is_empty() {
1367 let json = item[field].to_string();
1368 item[field] = match json::parse(&json) {
1369 Ok(e) => e,
1370 Err(_) => JsonValue::from(json),
1371 };
1372 }
1373 }
1374 }
1375 data.clone()
1376 }
1377 false => array![],
1378 }
1379 }
1380
1381 fn find(&mut self) -> JsonValue {
1382 self.params.page = 1;
1383 self.params.limit = 1;
1384 let sql = self.params.select_sql();
1385 if self.params.sql {
1386 return JsonValue::from(sql.clone());
1387 }
1388 let (state, mut data) = self.query(sql.as_str());
1389 match state {
1390 true => {
1391 if data.is_empty() {
1392 return object! {};
1393 }
1394 for (field, _) in self.params.json.entries() {
1395 if !data[0][field].is_empty() {
1396 let json = data[0][field].to_string();
1397 let json = json::parse(&json).unwrap_or(array![]);
1398 data[0][field] = json;
1399 } else {
1400 data[0][field] = array![];
1401 }
1402 }
1403 data[0].clone()
1404 }
1405 false => {
1406 object! {}
1407 }
1408 }
1409 }
1410
1411 fn value(&mut self, field: &str) -> JsonValue {
1412 self.params.fields = object! {};
1413 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1414 self.params.page = 1;
1415 self.params.limit = 1;
1416 let sql = self.params.select_sql();
1417 if self.params.sql {
1418 return JsonValue::from(sql.clone());
1419 }
1420 let (state, mut data) = self.query(sql.as_str());
1421 match state {
1422 true => {
1423 for (field, _) in self.params.json.entries() {
1424 if !data[0][field].is_empty() {
1425 let json = data[0][field].to_string();
1426 let json = json::parse(&json).unwrap_or(array![]);
1427 data[0][field] = json;
1428 } else {
1429 data[0][field] = array![];
1430 }
1431 }
1432 data[0][field].clone()
1433 }
1434 false => {
1435 if self.connection.debug {
1436 info!("{data:?}");
1437 }
1438 JsonValue::Null
1439 }
1440 }
1441 }
1442
1443 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1444 let fields_list = self.table_info(&self.params.table.clone());
1445 let mut fields = vec![];
1446 let mut values = vec![];
1447 if !self.params.autoinc && data["id"].is_empty() {
1448 let thread_id = format!("{:?}", std::thread::current().id());
1449 let thread_num: u64 = thread_id
1450 .trim_start_matches("ThreadId(")
1451 .trim_end_matches(")")
1452 .parse()
1453 .unwrap_or(0);
1454 data["id"] = format!(
1455 "{:X}{:X}",
1456 Local::now().timestamp_nanos_opt().unwrap_or(0),
1457 thread_num
1458 )
1459 .into();
1460 }
1461 for (field, value) in data.entries() {
1462 fields.push(format!("\"{}\"", field));
1463
1464 if value.is_string() {
1465 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1466 continue;
1467 } else if value.is_array() {
1468 if self.params.json[field].is_empty() {
1469 let array = value
1470 .members()
1471 .map(|x| x.as_str().unwrap_or(""))
1472 .collect::<Vec<&str>>()
1473 .join(",");
1474 values.push(format!("'{}'", array.replace("'", "''")));
1475 } else {
1476 let json = value.to_string();
1477 let json = json.replace("'", "''");
1478 values.push(format!("'{json}'"));
1479 }
1480 continue;
1481 } else if value.is_object() {
1482 if self.params.json[field].is_empty() {
1483 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1484 } else {
1485 let json = value.to_string();
1486 let json = json.replace("'", "''");
1487 values.push(format!("'{json}'"));
1488 }
1489 continue;
1490 } else if value.is_number() {
1491 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1492 if col_type == "boolean" {
1493 let bool_val = value.as_i64().unwrap_or(0) != 0;
1494 values.push(format!("{bool_val}"));
1495 } else if col_type.contains("int") {
1496 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1497 } else {
1498 values.push(format!("{value}"));
1499 }
1500 continue;
1501 } else if value.is_boolean() || value.is_null() {
1502 values.push(format!("{value}"));
1503 continue;
1504 } else {
1505 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1506 continue;
1507 }
1508 }
1509 let fields = fields.join(",");
1510 let values = values.join(",");
1511
1512 let sql = format!(
1513 "INSERT INTO {} ({}) VALUES ({});",
1514 self.params.table, fields, values
1515 );
1516 if self.params.sql {
1517 return JsonValue::from(sql.clone());
1518 }
1519 let (state, ids) = self.execute(sql.as_str());
1520
1521 match state {
1522 true => match self.params.autoinc {
1523 true => ids.clone(),
1524 false => data["id"].clone(),
1525 },
1526 false => {
1527 let thread_id = format!("{:?}", thread::current().id());
1528 error!("添加失败: {thread_id} {ids:?} {sql}");
1529 JsonValue::from("")
1530 }
1531 }
1532 }
1533 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1534 let fields_list = self.table_info(&self.params.table.clone());
1535 let mut fields = String::new();
1536 if !self.params.autoinc && data[0]["id"].is_empty() {
1537 data[0]["id"] = "".into();
1538 }
1539 for (field, _) in data[0].entries() {
1540 fields = format!("{fields},\"{field}\"");
1541 }
1542 fields = fields.trim_start_matches(",").to_string();
1543
1544 let core_count = num_cpus::get();
1545 let mut p = pools::Pool::new(core_count * 4);
1546
1547 let autoinc = self.params.autoinc;
1548 for list in data.members() {
1549 let mut item = list.clone();
1550 let i = br_fields::str::Code::verification_code(3);
1551 let fields_list_new = fields_list.clone();
1552 p.execute(move |pcindex| {
1553 if !autoinc && item["id"].is_empty() {
1554 let id = format!(
1555 "{:X}{:X}{}",
1556 Local::now().timestamp_nanos_opt().unwrap_or(0),
1557 pcindex,
1558 i
1559 );
1560 item["id"] = id.into();
1561 }
1562 let mut values = "".to_string();
1563 for (field, value) in item.entries() {
1564 if value.is_string() {
1565 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1566 } else if value.is_number() {
1567 let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1568 if col_type == "boolean" {
1569 let bool_val = value.as_i64().unwrap_or(0) != 0;
1570 values = format!("{values},{bool_val}");
1571 } else if col_type.contains("int") {
1572 values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1573 } else {
1574 values = format!("{values},{value}");
1575 }
1576 } else if value.is_boolean() {
1577 values = format!("{values},{value}");
1578 continue;
1579 } else {
1580 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1581 }
1582 }
1583 values = format!("({})", values.trim_start_matches(","));
1584 array![item["id"].clone(), values]
1585 });
1586 }
1587 let (ids_list, mut values) = p.insert_all();
1588 values = values.trim_start_matches(",").to_string();
1589 let sql = format!(
1590 "INSERT INTO {} ({}) VALUES {};",
1591 self.params.table, fields, values
1592 );
1593
1594 if self.params.sql {
1595 return JsonValue::from(sql.clone());
1596 }
1597 let (state, data) = self.execute(sql.as_str());
1598 match state {
1599 true => match autoinc {
1600 true => data,
1601 false => JsonValue::from(ids_list),
1602 },
1603 false => {
1604 error!("insert_all: {data:?}");
1605
1606 array![]
1607 }
1608 }
1609 }
1610 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1611 let fields_list = self.table_info(&self.params.table.clone());
1612 let mut fields = vec![];
1613 let mut values = vec![];
1614 if !self.params.autoinc && data["id"].is_empty() {
1615 let thread_id = format!("{:?}", std::thread::current().id());
1616 let thread_num: u64 = thread_id
1617 .trim_start_matches("ThreadId(")
1618 .trim_end_matches(")")
1619 .parse()
1620 .unwrap_or(0);
1621 data["id"] = format!(
1622 "{:X}{:X}",
1623 Local::now().timestamp_nanos_opt().unwrap_or(0),
1624 thread_num
1625 )
1626 .into();
1627 }
1628 for (field, value) in data.entries() {
1629 fields.push(format!("\"{}\"", field));
1630
1631 if value.is_string() {
1632 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1633 continue;
1634 } else if value.is_array() {
1635 if self.params.json[field].is_empty() {
1636 let array = value
1637 .members()
1638 .map(|x| x.as_str().unwrap_or(""))
1639 .collect::<Vec<&str>>()
1640 .join(",");
1641 values.push(format!("'{}'", array.replace("'", "''")));
1642 } else {
1643 let json = value.to_string();
1644 let json = json.replace("'", "''");
1645 values.push(format!("'{json}'"));
1646 }
1647 continue;
1648 } else if value.is_object() {
1649 if self.params.json[field].is_empty() {
1650 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1651 } else {
1652 let json = value.to_string();
1653 let json = json.replace("'", "''");
1654 values.push(format!("'{json}'"));
1655 }
1656 continue;
1657 } else if value.is_number() {
1658 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1659 if col_type == "boolean" {
1660 let bool_val = value.as_i64().unwrap_or(0) != 0;
1661 values.push(format!("{bool_val}"));
1662 } else if col_type.contains("int") {
1663 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1664 } else {
1665 values.push(format!("{value}"));
1666 }
1667 continue;
1668 } else if value.is_boolean() || value.is_null() {
1669 values.push(format!("{value}"));
1670 continue;
1671 } else {
1672 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1673 continue;
1674 }
1675 }
1676
1677 let conflict_cols: Vec<String> = conflict_fields
1678 .iter()
1679 .map(|f| format!("\"{}\"", f))
1680 .collect();
1681
1682 let update_set: Vec<String> = fields
1683 .iter()
1684 .filter(|f| {
1685 let name = f.trim_matches('"');
1686 !conflict_fields.contains(&name) && name != "id"
1687 })
1688 .map(|f| format!("{f}=EXCLUDED.{f}"))
1689 .collect();
1690
1691 let fields_str = fields.join(",");
1692 let values_str = values.join(",");
1693
1694 let sql = format!(
1695 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1696 self.params.table,
1697 fields_str,
1698 values_str,
1699 conflict_cols.join(","),
1700 update_set.join(",")
1701 );
1702 if self.params.sql {
1703 return JsonValue::from(sql.clone());
1704 }
1705 let (state, result) = self.execute(sql.as_str());
1706 match state {
1707 true => match self.params.autoinc {
1708 true => result.clone(),
1709 false => data["id"].clone(),
1710 },
1711 false => {
1712 let thread_id = format!("{:?}", thread::current().id());
1713 error!("upsert失败: {thread_id} {result:?} {sql}");
1714 JsonValue::from("")
1715 }
1716 }
1717 }
1718 fn update(&mut self, data: JsonValue) -> JsonValue {
1719 let fields_list = self.table_info(&self.params.table.clone());
1720 let mut values = vec![];
1721 for (field, value) in data.entries() {
1722 if value.is_string() {
1723 values.push(format!(
1724 "\"{}\"='{}'",
1725 field,
1726 value.to_string().replace("'", "''")
1727 ));
1728 } else if value.is_number() {
1729 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1730 if col_type == "boolean" {
1731 let bool_val = value.as_i64().unwrap_or(0) != 0;
1732 values.push(format!("\"{field}\"= {bool_val}"));
1733 } else if col_type.contains("int") {
1734 values.push(format!(
1735 "\"{}\"= {}",
1736 field,
1737 value.as_f64().unwrap_or(0.0) as i64
1738 ));
1739 } else {
1740 values.push(format!("\"{field}\"= {value}"));
1741 }
1742 } else if value.is_array() {
1743 if self.params.json[field].is_empty() {
1744 let array = value
1745 .members()
1746 .map(|x| x.as_str().unwrap_or(""))
1747 .collect::<Vec<&str>>()
1748 .join(",");
1749 values.push(format!("\"{}\"='{}'", field, array.replace("'", "''")));
1750 } else {
1751 let json = value.to_string();
1752 let json = json.replace("'", "''");
1753 values.push(format!("\"{field}\"='{json}'"));
1754 }
1755 continue;
1756 } else if value.is_object() {
1757 if self.params.json[field].is_empty() {
1758 values.push(format!(
1759 "\"{}\"='{}'",
1760 field,
1761 value.to_string().replace("'", "''")
1762 ));
1763 } else {
1764 if value.is_empty() {
1765 values.push(format!("\"{field}\"=''"));
1766 continue;
1767 }
1768 let json = value.to_string();
1769 let json = json.replace("'", "''");
1770 values.push(format!("\"{field}\"='{json}'"));
1771 }
1772 continue;
1773 } else if value.is_boolean() || value.is_null() {
1774 values.push(format!("\"{field}\"= {value}"));
1775 } else {
1776 values.push(format!("\"{field}\"=\"{value}\""));
1777 }
1778 }
1779
1780 for (field, value) in self.params.inc_dec.entries() {
1781 values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1782 }
1783 if !self.params.update_column.is_empty() {
1784 values.extend(self.params.update_column.clone());
1785 }
1786 let values = values.join(",");
1787
1788 let sql = format!(
1789 "UPDATE {} SET {} {};",
1790 self.params.table.clone(),
1791 values,
1792 self.params.where_sql()
1793 );
1794 if self.params.sql {
1795 return JsonValue::from(sql.clone());
1796 }
1797 let (state, data) = self.execute(sql.as_str());
1798 if state {
1799 data
1800 } else {
1801 let thread_id = format!("{:?}", thread::current().id());
1802 error!("update: {thread_id} {data:?} {sql}");
1803 0.into()
1804 }
1805 }
1806 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1807 let fields_list = self.table_info(&self.params.table.clone());
1808 let mut values = vec![];
1809
1810 let mut ids = vec![];
1811 for (field, _) in data[0].entries() {
1812 if field == "id" {
1813 continue;
1814 }
1815 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1816 let mut fields = vec![];
1817 for row in data.members() {
1818 let value = row[field].clone();
1819 let id = row["id"].clone();
1820 ids.push(id.clone());
1821 if value.is_string() {
1822 fields.push(format!(
1823 "WHEN '{}' THEN '{}'",
1824 id,
1825 value.to_string().replace("'", "''")
1826 ));
1827 } else if value.is_array() || value.is_object() {
1828 if self.params.json[field].is_empty() {
1829 fields.push(format!(
1830 "WHEN '{}' THEN '{}'",
1831 id,
1832 value.to_string().replace("'", "''")
1833 ));
1834 } else {
1835 let json = value.to_string();
1836 let json = json.replace("'", "''");
1837 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1838 }
1839 continue;
1840 } else if value.is_number() {
1841 if col_type == "boolean" {
1842 let bool_val = value.as_i64().unwrap_or(0) != 0;
1843 fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1844 } else {
1845 fields.push(format!("WHEN '{id}' THEN {value}"));
1846 }
1847 } else if value.is_boolean() || value.is_null() {
1848 fields.push(format!("WHEN '{id}' THEN {value}"));
1849 } else {
1850 fields.push(format!(
1851 "WHEN '{}' THEN '{}'",
1852 id,
1853 value.to_string().replace("'", "''")
1854 ));
1855 }
1856 }
1857 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1858 }
1859 self.where_and("id", "in", ids.into());
1860 for (field, value) in self.params.inc_dec.entries() {
1861 values.push(format!("{} = {}", field, value.to_string().clone()));
1862 }
1863
1864 let values = values.join(",");
1865 let sql = format!(
1866 "UPDATE {} SET {} {} {};",
1867 self.params.table.clone(),
1868 values,
1869 self.params.where_sql(),
1870 self.params.page_limit_sql()
1871 );
1872 if self.params.sql {
1873 return JsonValue::from(sql.clone());
1874 }
1875 let (state, data) = self.execute(sql.as_str());
1876 if state {
1877 data
1878 } else {
1879 error!("update_all: {data:?}");
1880 JsonValue::from(0)
1881 }
1882 }
1883
1884 fn delete(&mut self) -> JsonValue {
1885 let sql = format!(
1886 "delete FROM {} {} {};",
1887 self.params.table.clone(),
1888 self.params.where_sql(),
1889 self.params.page_limit_sql()
1890 );
1891 if self.params.sql {
1892 return JsonValue::from(sql.clone());
1893 }
1894 let (state, data) = self.execute(sql.as_str());
1895 match state {
1896 true => data,
1897 false => {
1898 error!("delete 失败>>> {data:?}");
1899 JsonValue::from(0)
1900 }
1901 }
1902 }
1903
1904 fn transaction(&mut self) -> bool {
1905 let thread_id = format!("{:?}", thread::current().id());
1906 let key = format!("{}{}", self.default, thread_id);
1907
1908 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1909 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1910 PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1911 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1912 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1913 return true;
1914 }
1915
1916 let mut conn = None;
1918 for attempt in 0..3u8 {
1919 match self.client.get_connect_for_transaction() {
1920 Ok(mut c) => {
1921 if c.is_valid() {
1922 conn = Some(c);
1923 break;
1924 }
1925 warn!("事务连接无效(第{}次)", attempt + 1);
1926 self.client.release_transaction_conn();
1927 }
1928 Err(e) => {
1929 warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1930 }
1931 }
1932 if attempt < 2 {
1933 thread::sleep(std::time::Duration::from_millis(200));
1934 }
1935 }
1936 let mut conn = match conn {
1937 Some(c) => c,
1938 None => {
1939 error!("获取事务连接重试耗尽");
1940 return false;
1941 }
1942 };
1943
1944 if let Err(e) = conn.execute("START TRANSACTION") {
1945 error!("启动事务失败: {e}");
1946 self.client.release_transaction_conn();
1947 return false;
1948 }
1949
1950 PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1951 true
1952 }
1953 fn commit(&mut self) -> bool {
1954 let thread_id = format!("{:?}", thread::current().id());
1955 let key = format!("{}{}", self.default, thread_id);
1956
1957 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1958 error!("commit: 没有活跃的事务");
1959 return false;
1960 }
1961
1962 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1963 if depth > 1 {
1964 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1965 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1966 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1967 return true;
1968 }
1969
1970 let commit_result =
1971 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1972
1973 let success = match commit_result {
1974 Some(Ok(_)) => true,
1975 Some(Err(e)) => {
1976 error!("提交事务失败: {e}");
1977 false
1978 }
1979 None => {
1980 error!("提交事务失败: 未找到连接");
1981 false
1982 }
1983 };
1984
1985 if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id) {
1986 self.client.release_transaction_conn_with_conn(conn);
1987 } else {
1988 self.client.release_transaction_conn();
1989 }
1990 success
1991 }
1992
1993 fn rollback(&mut self) -> bool {
1994 let thread_id = format!("{:?}", thread::current().id());
1995 let key = format!("{}{}", self.default, thread_id);
1996
1997 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1998 error!("rollback: 没有活跃的事务");
1999 return false;
2000 }
2001
2002 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
2003 if depth > 1 {
2004 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
2005 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
2006 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2007 return true;
2008 }
2009
2010 let rollback_result =
2011 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
2012
2013 let success = match rollback_result {
2014 Some(Ok(_)) => true,
2015 Some(Err(e)) => {
2016 error!("回滚失败: {e}");
2017 false
2018 }
2019 None => {
2020 error!("回滚失败: 未找到连接");
2021 false
2022 }
2023 };
2024
2025 if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id) {
2026 self.client.release_transaction_conn_with_conn(conn);
2027 } else {
2028 self.client.release_transaction_conn();
2029 }
2030 success
2031 }
2032
2033 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
2034 let (state, data) = self.query(sql);
2035 match state {
2036 true => Ok(data),
2037 false => Err(data.to_string()),
2038 }
2039 }
2040
2041 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2042 let (state, data) = self.execute(sql);
2043 match state {
2044 true => Ok(data),
2045 false => Err(data.to_string()),
2046 }
2047 }
2048
2049 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2050 self.params.inc_dec[field] = format!("{field} + {num}").into();
2051 self
2052 }
2053
2054 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2055 self.params.inc_dec[field] = format!("{field} - {num}").into();
2056 self
2057 }
2058 fn buildsql(&mut self) -> String {
2059 self.fetch_sql();
2060 let sql = self.select().to_string();
2061 format!("( {} ) {}", sql, self.params.table)
2062 }
2063
2064 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2065 for field in fields {
2066 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2067 }
2068 self
2069 }
2070
2071 fn join(
2072 &mut self,
2073 main_table: &str,
2074 main_fields: &str,
2075 right_table: &str,
2076 right_fields: &str,
2077 ) -> &mut Self {
2078 let main_table = if main_table.is_empty() {
2079 self.params.table.clone()
2080 } else {
2081 main_table.to_string()
2082 };
2083 self.params.join_table = right_table.to_string();
2084 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2085 self
2086 }
2087
2088 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2089 let main_fields = if main_fields.is_empty() {
2090 "id"
2091 } else {
2092 main_fields
2093 };
2094 let second_fields = if second_fields.is_empty() {
2095 self.params.table.clone()
2096 } else {
2097 second_fields.to_string().clone()
2098 };
2099 let sec_table_name = format!("{}{}", table, "_2");
2100 let second_table = format!("{} {}", table, sec_table_name.clone());
2101 self.params.join_table = sec_table_name.clone();
2102 self.params.join.push(format!(
2103 " INNER JOIN {} ON {}.{} = {}.{}",
2104 second_table, self.params.table, main_fields, sec_table_name, second_fields
2105 ));
2106 self
2107 }
2108
2109 fn join_right(
2110 &mut self,
2111 main_table: &str,
2112 main_fields: &str,
2113 right_table: &str,
2114 right_fields: &str,
2115 ) -> &mut Self {
2116 let main_table = if main_table.is_empty() {
2117 self.params.table.clone()
2118 } else {
2119 main_table.to_string()
2120 };
2121 self.params.join_table = right_table.to_string();
2122 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2123 self
2124 }
2125
2126 fn join_full(
2127 &mut self,
2128 main_table: &str,
2129 main_fields: &str,
2130 right_table: &str,
2131 right_fields: &str,
2132 ) -> &mut Self {
2133 let main_table = if main_table.is_empty() {
2134 self.params.table.clone()
2135 } else {
2136 main_table.to_string()
2137 };
2138 self.params.join_table = right_table.to_string();
2139 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2140 self
2141 }
2142
2143 fn union(&mut self, sub_sql: &str) -> &mut Self {
2144 self.params.unions.push(format!("UNION {sub_sql}"));
2145 self
2146 }
2147
2148 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2149 self.params.unions.push(format!("UNION ALL {sub_sql}"));
2150 self
2151 }
2152
2153 fn lock_for_update(&mut self) -> &mut Self {
2154 self.params.lock_mode = "FOR UPDATE".to_string();
2155 self
2156 }
2157
2158 fn lock_for_share(&mut self) -> &mut Self {
2159 self.params.lock_mode = "FOR SHARE".to_string();
2160 self
2161 }
2162}