1use crate::config::Connection;
2use crate::TABLE_FIELDS;
3use crate::pools;
4use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
5use crate::types::{DbMode, Mode, Params, TableOptions};
6use br_pgsql::pools::Pools;
7use br_pgsql::PgsqlError;
8use chrono::Local;
9use json::{array, object, JsonValue};
10use log::{error, info, warn};
11use std::thread;
12#[derive(Clone)]
13pub struct Pgsql {
14 pub connection: Connection,
16 pub default: String,
18 pub params: Params,
19 pub client: Pools,
20}
21
22impl Pgsql {
23 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
24 let port = connection
25 .hostport
26 .parse::<i32>()
27 .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
28
29 let cp_connection = connection.clone();
30 let config = object! {
31 debug: cp_connection.debug,
32 username: cp_connection.username,
33 userpass: cp_connection.userpass,
34 database: cp_connection.database,
35 hostname: cp_connection.hostname,
36 hostport: port,
37 charset: cp_connection.charset.str(),
38 };
39 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
40
41 let pools = pgsql.pools()?;
42 Ok(Self {
43 connection,
44 default: default.clone(),
45 params: Params::default("pgsql"),
46 client: pools,
47 })
48 }
49
50 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
51 let thread_id = format!("{:?}", thread::current().id());
52 let key = format!("{}{}", self.default, thread_id);
53
54 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
55 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
56
57 match result {
58 Some(Ok(e)) => {
59 if self.connection.debug {
60 info!("查询成功: {} {}", thread_id.clone(), sql);
61 }
62 (true, e.rows)
63 }
64 Some(Err(e)) => {
65 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
66 (false, JsonValue::from(e.to_string()))
67 }
68 None => {
69 error!("事务查询失败: 未找到事务连接 {thread_id}");
70 (false, JsonValue::from("未找到事务连接"))
71 }
72 }
73 } else {
74 let mut guard = match self.client.get_guard() {
75 Ok(g) => g,
76 Err(e) => {
77 error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
79 return (false, JsonValue::from(e.to_string()));
80 }
81 };
82 match guard.conn().query(sql) {
83 Ok(e) => {
84 if self.connection.debug {
85 info!("查询成功: {} {}", thread_id.clone(), sql);
86 }
87 (true, e.rows)
88 }
89 Err(ref e) if Self::is_retriable_error(e) => {
90 guard.discard();
93 self.client.flush_idle();
95 warn!("非事务查询连接断开(重试一次): {thread_id} {e}");
96 thread::sleep(std::time::Duration::from_millis(200));
97 let mut guard2 = match self.client.get_guard() {
98 Ok(g) => g,
99 Err(e) => {
100 error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
101 return (false, JsonValue::from(e.to_string()));
102 }
103 };
104 match guard2.conn().query(sql) {
105 Ok(e) => {
106 if self.connection.debug {
107 info!("查询成功(重试): {} {}", thread_id.clone(), sql);
108 }
109 (true, e.rows)
110 }
111 Err(e) => {
112 error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
113 (false, JsonValue::from(e.to_string()))
114 }
115 }
116 }
117 Err(e) => {
118 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
119 (false, JsonValue::from(e.to_string()))
120 }
121 }
122 }
123 }
124 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
125 let thread_id = format!("{:?}", thread::current().id());
126 let key = format!("{}{}", self.default, thread_id);
127
128 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
129 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
130
131 match result {
132 Some(Ok(e)) => {
133 if self.connection.debug {
134 info!("提交成功: {} {}", thread_id.clone(), sql);
135 }
136 if sql.contains("INSERT") {
137 (true, e.rows)
138 } else {
139 (true, e.affect_count.into())
140 }
141 }
142 Some(Err(e)) => {
143 error!("事务提交失败: {thread_id} {e}");
144 (false, JsonValue::from(e.to_string()))
145 }
146 None => {
147 error!("事务执行失败: 未找到事务连接 {thread_id}");
148 (false, JsonValue::from("未找到事务连接"))
149 }
150 }
151 } else {
152 let mut guard = match self.client.get_guard() {
154 Ok(g) => g,
155 Err(e) => {
156 error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
157 return (false, JsonValue::from(e.to_string()));
158 }
159 };
160 match guard.conn().execute(sql) {
161 Ok(e) => {
162 if self.connection.debug {
163 info!("提交成功: {} {}", thread_id.clone(), sql);
164 }
165 if sql.contains("INSERT") {
166 (true, e.rows)
167 } else {
168 (true, e.affect_count.into())
169 }
170 }
171 Err(ref e) if Self::is_retriable_error(e) => {
172 guard.discard();
175 self.client.flush_idle();
177 warn!("非事务执行连接断开(重试一次): {thread_id} {e}");
178 thread::sleep(std::time::Duration::from_millis(200));
179 let mut guard2 = match self.client.get_guard() {
180 Ok(g) => g,
181 Err(e) => {
182 error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
183 return (false, JsonValue::from(e.to_string()));
184 }
185 };
186 match guard2.conn().execute(sql) {
187 Ok(e) => {
188 if self.connection.debug {
189 info!("提交成功(重试): {} {}", thread_id.clone(), sql);
190 }
191 if sql.contains("INSERT") {
192 (true, e.rows)
193 } else {
194 (true, e.affect_count.into())
195 }
196 }
197 Err(e) => {
198 error!("非事务执行重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
199 (false, JsonValue::from(e.to_string()))
200 }
201 }
202 }
203 Err(e) => {
204 error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
205 (false, JsonValue::from(e.to_string()))
206 }
207 }
208 }
209 }
210
211 fn is_retriable_error(e: &PgsqlError) -> bool {
213 matches!(
214 e,
215 PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
216 )
217 }
218}
219
220impl DbMode for Pgsql {
221 fn database_tables(&mut self) -> JsonValue {
222 let sql = "SHOW TABLES".to_string();
223 match self.sql(sql.as_str()) {
224 Ok(e) => {
225 let mut list = vec![];
226 for item in e.members() {
227 for (_, value) in item.entries() {
228 list.push(value.clone());
229 }
230 }
231 list.into()
232 }
233 Err(_) => {
234 array![]
235 }
236 }
237 }
238
239 fn database_create(&mut self, name: &str) -> bool {
240 let sql = format!("CREATE DATABASE {name}");
241
242 let (state, data) = self.execute(sql.as_str());
243 match state {
244 true => data.as_bool().unwrap_or(true),
245 false => {
246 error!("创建数据库失败: {data:?}");
247 false
248 }
249 }
250 }
251
252 fn truncate(&mut self, table: &str) -> bool {
253 let sql = format!("TRUNCATE TABLE {table}");
254 let (state, _) = self.execute(sql.as_str());
255 state
256 }
257}
258
259impl Mode for Pgsql {
260 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
261 let mut sql = String::new();
262 let mut comments = vec![];
263
264 if !options.table_unique.is_empty() {
265 let full_name = format!(
266 "{}_unique_{}",
267 options.table_name,
268 options.table_unique.join("_")
269 );
270 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
271 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
272 let unique = format!(
273 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
274 name,
275 options.table_name,
276 options.table_unique.join(",")
277 );
278 comments.push(unique);
279 }
280
281 for row in options.table_index.iter() {
282 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
283 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
284 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
285 let index = format!(
286 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
287 name,
288 options.table_name,
289 row.join(",")
290 );
291 comments.push(index);
292 }
293
294 for (name, field) in options.table_fields.entries_mut() {
295 field["table_name"] = options.table_name.clone().into();
296 let row = br_fields::field("pgsql", name, field.clone());
297 let (col_sql, meta) = if let Some(idx) = row.find("--") {
298 (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
299 } else {
300 (row.trim(), None)
301 };
302 if let Some(meta) = meta {
303 comments.push(format!(
304 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
305 options.table_name, name, meta
306 ));
307 }
308 sql = format!("{} {},\r\n", sql, col_sql);
309 }
310
311 let primary_key = format!(
312 "CONSTRAINT {}_{} PRIMARY KEY ({})",
313 options.table_name, options.table_key, options.table_key
314 );
315 let sql = format!(
316 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
317 options.table_name,
318 sql.trim_end_matches(",\r\n"),
319 primary_key
320 );
321 comments.insert(0, sql);
322
323 for (_name, field) in options.table_fields.entries() {
324 let _ = field["mode"].as_str();
325 }
326
327 if self.params.sql {
328 let info = comments.join("\r\n");
329 return JsonValue::from(info);
330 }
331 for comment in comments {
332 let (state, _) = self.execute(comment.as_str());
333 match state {
334 true => {}
335 false => {
336 return JsonValue::from(state);
337 }
338 }
339 }
340 JsonValue::from(true)
341 }
342
343 fn table_update(&mut self, options: TableOptions) -> JsonValue {
344 let cache_key = format!("{}{}", self.default, options.table_name);
346 let table_fields_guard = match TABLE_FIELDS.read() {
347 Ok(g) => g,
348 Err(e) => e.into_inner(),
349 };
350 if table_fields_guard.get(&cache_key).is_some() {
351 drop(table_fields_guard);
352 let mut table_fields_guard = match TABLE_FIELDS.write() {
353 Ok(g) => g,
354 Err(e) => e.into_inner(),
355 };
356 table_fields_guard.remove(&cache_key);
357 } else {
358 drop(table_fields_guard);
359 }
360 let fields_list = self.table_info(&options.table_name);
361 let mut put = vec![];
362 let mut add = vec![];
363 let mut del = vec![];
364 let mut comments = vec![];
365
366 for (key, _) in fields_list.entries() {
367 if options.table_fields[key].is_empty() {
368 del.push(key);
369 }
370 }
371 for (name, field) in options.table_fields.entries() {
372 if !fields_list[name].is_empty() {
373 let old_info = &fields_list[name];
374 let new_field_sql = br_fields::field("pgsql", name, field.clone());
375
376 let old_comment = old_info["comment"].as_str().unwrap_or("");
377 let old_type = old_info["type"].as_str().unwrap_or("");
378
379 let new_comment = if let Some(idx) = new_field_sql.find("--") {
380 new_field_sql[idx + 2..].trim()
381 } else {
382 ""
383 };
384
385 let comment_matches =
386 if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
387 let old_parts: Vec<&str> = old_comment.split('|').collect();
388 let new_parts: Vec<&str> = new_comment.split('|').collect();
389 if old_parts.len() >= 4 && new_parts.len() >= 4 {
390 old_parts[..4] == new_parts[..4]
391 } else {
392 old_comment == new_comment
393 }
394 } else if !old_comment.is_empty() && !new_comment.is_empty() {
395 let old_parts: Vec<&str> = old_comment.split('|').collect();
396 let new_parts: Vec<&str> = new_comment.split('|').collect();
397 if old_parts.len() >= 2
398 && new_parts.len() >= 2
399 && old_parts.len() == new_parts.len()
400 {
401 let old_filtered: Vec<&str> = old_parts
402 .iter()
403 .filter(|v| **v != "true" && **v != "false")
404 .copied()
405 .collect();
406 let new_filtered: Vec<&str> = new_parts
407 .iter()
408 .filter(|v| **v != "true" && **v != "false")
409 .copied()
410 .collect();
411 old_filtered == new_filtered
412 } else {
413 old_comment == new_comment
414 }
415 } else {
416 old_comment == new_comment
417 };
418
419 let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
420 let new_type = if sql_parts.len() > 1 {
421 sql_parts[1].to_lowercase()
422 } else {
423 String::new()
424 };
425
426 let type_matches = match old_type {
427 "integer" => {
428 new_type.contains("int")
429 && !new_type.contains("bigint")
430 && !new_type.contains("smallint")
431 }
432 "bigint" => new_type.contains("bigint"),
433 "smallint" => new_type.contains("smallint"),
434 "boolean" => new_type.contains("boolean"),
435 "text" => new_type.contains("text"),
436 "character varying" => {
437 if !new_type.contains("varchar") {
438 false
439 } else {
440 let old_len = old_info["max_length"].as_i64().unwrap_or(0);
441 let new_len = new_type
442 .trim_start_matches("varchar(")
443 .trim_end_matches(')')
444 .parse::<i64>()
445 .unwrap_or(0);
446 let matched = old_len == new_len || new_len == 0;
447 if !matched {
448 log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
449 }
450 old_len == new_len || new_len == 0
451 }
452 }
453 "character" => new_type.contains("char") && !new_type.contains("varchar"),
454 "numeric" => {
455 if !(new_type.contains("numeric") || new_type.contains("decimal")) {
456 false
457 } else {
458 let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
459 let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
460 let inner = new_type
461 .replace("numeric(", "")
462 .replace("decimal(", "")
463 .replace(')', "");
464 let parts: Vec<&str> = inner.split(',').collect();
465 let new_prec = parts.first().and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
466 let new_scale = parts.get(1).and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
467 old_prec == new_prec && old_scale == new_scale
468 }
469 }
470 "double precision" => {
471 new_type.contains("double") || new_type.contains("float8")
472 }
473 "real" => new_type.contains("real") || new_type.contains("float4"),
474 "timestamp without time zone" | "timestamp with time zone" => {
475 new_type.contains("timestamp")
476 }
477 "date" => new_type.contains("date") && !new_type.contains("timestamp"),
478 "time without time zone" | "time with time zone" => {
479 new_type.contains("time") && !new_type.contains("timestamp")
480 }
481 "json" | "jsonb" => new_type.contains("json"),
482 "uuid" => new_type.contains("uuid"),
483 "bytea" => new_type.contains("bytea"),
484 _ => old_type == new_type,
485 };
486
487 if type_matches && comment_matches {
488 continue;
489 }
490
491 log::debug!(
492 "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
493 options.table_name,
494 name,
495 type_matches,
496 old_type,
497 new_type,
498 comment_matches
499 );
500 put.push(name);
501 } else {
502 add.push(name);
503 }
504 }
505
506 for name in add.iter() {
507 let name = name.to_string();
508 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
509 let rows = row.split("--").collect::<Vec<&str>>();
510 comments.push(format!(
511 r#"ALTER TABLE "{}" add {};"#,
512 options.table_name,
513 rows[0].trim()
514 ));
515 if rows.len() > 1 {
516 comments.push(format!(
517 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
518 options.table_name,
519 name,
520 rows[1].trim()
521 ));
522 }
523 }
524 for name in del.iter() {
525 comments.push(format!(
526 "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
527 options.table_name, name
528 ));
529 }
530 for name in put.iter() {
531 let name = name.to_string();
532 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
533 let rows = row.split("--").collect::<Vec<&str>>();
534
535 let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
536
537 if sql[1].contains("BOOLEAN") {
538 let text = format!(
539 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
540 options.table_name, name
541 );
542 comments.push(text.clone());
543 let text = format!(
544 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
545 options.table_name, name, sql[1]
546 );
547 comments.push(text.clone());
548 } else {
549 let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
550 let new_type_lower = sql[1].to_lowercase();
551 let is_date_to_numeric = (old_col_type == "date"
552 || old_col_type.contains("timestamp"))
553 && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
554 if is_date_to_numeric {
555 comments.push(format!(
556 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
557 options.table_name, name
558 ));
559 comments.push(format!(
560 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
561 options.table_name, name, sql[1], name, name, name
562 ));
563 } else {
564 let text = format!(
565 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
566 options.table_name, name, sql[1]
567 );
568 comments.push(text.clone());
569 }
570 };
571
572 if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
573 let default_value = rows[0][default_pos + 9..].trim();
574 if !default_value.is_empty() {
575 comments.push(format!(
576 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
577 options.table_name, name, default_value
578 ));
579 }
580 }
581 let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
584 .as_str()
585 .unwrap_or("YES");
586 let old_is_required = old_is_nullable == "NO";
587
588 if old_is_required && name != options.table_key {
590 comments.push(format!(
591 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
592 options.table_name, name
593 ));
594 }
595
596 if rows.len() > 1 {
597 comments.push(format!(
598 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
599 options.table_name,
600 name,
601 rows[1].trim()
602 ));
603 }
604 }
605
606 let mut unique_new = vec![];
607 let mut index_new = vec![];
608 let mut primary_key = vec![];
609 let (_, index_list) = self.query(
610 format!(
611 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
612 options.table_name
613 )
614 .as_str(),
615 );
616 for item in index_list.members() {
617 let key_name = item["indexname"].as_str().unwrap_or("");
618 let indexdef = item["indexdef"].to_string();
619
620 if indexdef.contains(
621 format!(
622 "CREATE UNIQUE INDEX {}_{} ON",
623 options.table_name, options.table_key
624 )
625 .as_str(),
626 ) {
627 primary_key.push(key_name.to_string());
628 continue;
629 }
630 if indexdef.contains("CREATE UNIQUE INDEX") {
631 unique_new.push(key_name.to_string());
632 continue;
633 }
634 if indexdef.contains("CREATE INDEX") {
635 index_new.push(key_name.to_string());
636 continue;
637 }
638 }
639
640 if !options.table_unique.is_empty() {
641 let full_name = format!(
642 "{}_unique_{}",
643 options.table_name,
644 options.table_unique.join("_")
645 );
646 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
647 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
648 let unique = format!(
649 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
650 name,
651 options.table_name,
652 options.table_unique.join(",")
653 );
654 if !unique_new.contains(&name) {
655 comments.push(unique);
656 }
657 unique_new.retain(|x| *x != name);
658 }
659
660 for row in options.table_index.iter() {
661 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
662 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
663 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
664 let index = format!(
665 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
666 name,
667 options.table_name,
668 row.join(",")
669 );
670 if !index_new.contains(&name) {
671 comments.push(index);
672 }
673 index_new.retain(|x| *x != name);
674 }
675
676 for item in unique_new {
677 if item.ends_with("_pkey") {
678 continue;
679 }
680 if item.starts_with("unique_") {
681 comments.push(format!(
682 "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
683 options.table_name,
684 item.clone()
685 ));
686 } else {
687 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
688 }
689 }
690 for item in index_new {
691 if item.ends_with("_pkey") {
692 continue;
693 }
694 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
695 }
696
697 if self.params.sql {
698 return JsonValue::from(comments.join(""));
699 }
700
701 if comments.is_empty() {
702 return JsonValue::from(-1);
703 }
704
705 for item in comments.iter() {
706 let (state, res) = self.execute(item.as_str());
707 match state {
708 true => {}
709 false => {
710 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
711 return JsonValue::from(0);
712 }
713 }
714 }
715 JsonValue::from(1)
716 }
717
718 fn table_info(&mut self, table: &str) -> JsonValue {
719 let cache_key = format!("{}{}", self.default, table);
721 let table_fields_guard = match TABLE_FIELDS.read() {
722 Ok(g) => g,
723 Err(e) => e.into_inner(),
724 };
725 if let Some(cached) = table_fields_guard.get(&cache_key) {
726 return cached.clone();
727 }
728 drop(table_fields_guard);
729 let sql = format!(
730 "SELECT COL.COLUMN_NAME,
731 COL.DATA_TYPE,
732 COL.IS_NULLABLE,
733 COL.CHARACTER_MAXIMUM_LENGTH,
734 COL.NUMERIC_PRECISION,
735 COL.NUMERIC_SCALE,
736 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
737 LEFT JOIN
738 pg_catalog.pg_description DESCRIPTION
739 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
740 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
741 let (state, data) = self.query(sql.as_str());
742 let mut list = object! {};
743 if state {
744 for item in data.members() {
745 let mut row = object! {};
746 row["field"] = item["column_name"].clone();
747 row["comment"] = item["comment"].clone();
748 row["type"] = item["data_type"].clone();
749 row["is_nullable"] = item["is_nullable"].clone();
750 row["max_length"] = item["character_maximum_length"].clone();
751 row["numeric_precision"] = item["numeric_precision"].clone();
752 row["numeric_scale"] = item["numeric_scale"].clone();
753 if let Some(field_name) = row["field"].as_str() {
754 list[field_name] = row.clone();
755 }
756 }
757 let mut table_fields_guard = match TABLE_FIELDS.write() {
759 Ok(g) => g,
760 Err(e) => e.into_inner(),
761 };
762 table_fields_guard.insert(cache_key, list.clone());
763 list
764 } else {
765 list
766 }
767 }
768
769 fn table_is_exist(&mut self, name: &str) -> bool {
770 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
771 let (state, data) = self.query(sql.as_str());
772 match state {
773 true => {
774 for item in data.members() {
775 if item.has_key("exists") {
776 return item["exists"].as_bool().unwrap_or(false);
777 }
778 }
779 false
780 }
781 false => false,
782 }
783 }
784
785 fn table(&mut self, name: &str) -> &mut Pgsql {
786 self.params = Params::default(self.connection.mode.str().as_str());
787 let table_name = format!("{}{}", self.connection.prefix, name);
788 if !super::sql_safety::validate_table_name(&table_name) {
789 error!("Invalid table name: {}", name);
790 }
791 self.params.table = table_name.clone();
792 self.params.join_table = table_name;
793 self
794 }
795
796 fn change_table(&mut self, name: &str) -> &mut Self {
797 self.params.join_table = name.to_string();
798 self
799 }
800
801 fn autoinc(&mut self) -> &mut Self {
802 self.params.autoinc = true;
803 self
804 }
805
806 fn timestamps(&mut self) -> &mut Self {
807 self.params.timestamps = true;
808 self
809 }
810
811 fn fetch_sql(&mut self) -> &mut Self {
812 self.params.sql = true;
813 self
814 }
815
816 fn order(&mut self, field: &str, by: bool) -> &mut Self {
817 self.params.order[field] = {
818 if by {
819 "DESC"
820 } else {
821 "ASC"
822 }
823 }
824 .into();
825 self
826 }
827
828 fn group(&mut self, field: &str) -> &mut Self {
829 let fields: Vec<&str> = field.split(",").collect();
830 for field in fields.iter() {
831 let field = field.to_string();
832 self.params.group[field.as_str()] = field.clone().into();
833 self.params.fields[field.as_str()] = field.clone().into();
834 }
835 self
836 }
837
838 fn distinct(&mut self) -> &mut Self {
839 self.params.distinct = true;
840 self
841 }
842
843 fn json(&mut self, field: &str) -> &mut Self {
844 let list: Vec<&str> = field.split(",").collect();
845 for item in list.iter() {
846 self.params.json[item.to_string().as_str()] = item.to_string().into();
847 }
848 self
849 }
850
851 fn location(&mut self, field: &str) -> &mut Self {
852 let list: Vec<&str> = field.split(",").collect();
853 for item in list.iter() {
854 self.params.location[item.to_string().as_str()] = item.to_string().into();
855 }
856 self
857 }
858
859 fn field(&mut self, field: &str) -> &mut Self {
860 let list: Vec<&str> = field.split(",").collect();
861 let join_table = if self.params.join_table.is_empty() {
862 self.params.table.clone()
863 } else {
864 self.params.join_table.clone()
865 };
866 for item in list.iter() {
867 let lower = item.to_lowercase();
868 let is_expr = lower.contains("count(")
869 || lower.contains("sum(")
870 || lower.contains("avg(")
871 || lower.contains("max(")
872 || lower.contains("min(")
873 || lower.contains("case ");
874 if is_expr {
875 self.params.fields[item.to_string().as_str()] = (*item).into();
876 } else if item.contains(" as ") {
877 let text = item.split(" as ").collect::<Vec<&str>>();
878 self.params.fields[item.to_string().as_str()] =
879 format!("{}.{} as {}", join_table, text[0], text[1]).into();
880 } else {
881 self.params.fields[item.to_string().as_str()] =
882 format!("{join_table}.{item}").into();
883 }
884 }
885 self
886 }
887
888 fn field_raw(&mut self, expr: &str) -> &mut Self {
889 self.params.fields[expr] = expr.into();
890 self
891 }
892
893 fn hidden(&mut self, name: &str) -> &mut Self {
894 let hidden: Vec<&str> = name.split(",").collect();
895
896 let fields_list = self.table_info(self.params.clone().table.as_str());
897 let mut data = array![];
898 for item in fields_list.members() {
899 let _ = data.push(object! {
900 "name":item["field"].as_str().unwrap_or("")
901 });
902 }
903
904 for item in data.members() {
905 let name = item["name"].as_str().unwrap_or("");
906 if !hidden.contains(&name) {
907 self.params.fields[name] = name.into();
908 }
909 }
910 self
911 }
912
913 fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
914 for f in field.split('|') {
915 if !super::sql_safety::validate_field_name(f) {
916 error!("Invalid field name: {}", f);
917 }
918 }
919 if !super::sql_safety::validate_compare_orator(compare) {
920 error!("Invalid compare operator: {}", compare);
921 }
922 let join_table = if self.params.join_table.is_empty() {
923 self.params.table.clone()
924 } else {
925 self.params.join_table.clone()
926 };
927 if value.is_boolean() {
928 let bool_val = value.as_bool().unwrap_or(false);
929 self.params
930 .where_and
931 .push(format!("{join_table}.{field} {compare} {bool_val}"));
932 return self;
933 }
934 match compare {
935 "between" => {
936 self.params.where_and.push(format!(
937 "{}.{} between '{}' AND '{}'",
938 join_table, field, value[0], value[1]
939 ));
940 }
941 "set" => {
942 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
943 let mut wheredata = vec![];
944 for item in list.iter() {
945 wheredata.push(format!(
946 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
947 ));
948 }
949 self.params
950 .where_and
951 .push(format!("({})", wheredata.join(" or ")));
952 }
953 "notin" => {
954 let mut text = String::new();
955 for item in value.members() {
956 text = format!("{text},'{item}'");
957 }
958 text = text.trim_start_matches(",").into();
959 self.params
960 .where_and
961 .push(format!("{join_table}.{field} not in ({text})"));
962 }
963 "is" => {
964 self.params
965 .where_and
966 .push(format!("{join_table}.{field} is {value}"));
967 }
968 "isnot" => {
969 self.params
970 .where_and
971 .push(format!("{join_table}.{field} is not {value}"));
972 }
973 "notlike" => {
974 self.params
975 .where_and
976 .push(format!("{join_table}.{field} not like '{value}'"));
977 }
978 "in" => {
979 if value.is_array() && value.is_empty() {
980 self.params.where_and.push("1=0".to_string());
981 return self;
982 }
983 let mut text = String::new();
984 if value.is_array() {
985 for item in value.members() {
986 text = format!("{text},'{item}'");
987 }
988 } else if value.is_null() {
989 text = format!("{text},null");
990 } else {
991 let value = value.as_str().unwrap_or("");
992
993 let value: Vec<&str> = value.split(",").collect();
994 for item in value.iter() {
995 text = format!("{text},'{item}'");
996 }
997 }
998 text = text.trim_start_matches(",").into();
999
1000 self.params
1001 .where_and
1002 .push(format!("{join_table}.{field} {compare} ({text})"));
1003 }
1004 "json_contains" => {
1008 if value.is_array() {
1009 if value.is_empty() {
1010 self.params.where_and.push("1=0".to_string());
1011 } else {
1012 let mut parts = vec![];
1013 for item in value.members() {
1014 let escaped = super::sql_safety::escape_string(&item.to_string());
1015 parts.push(format!(
1016 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1017 escaped
1018 ));
1019 }
1020 self.params
1021 .where_and
1022 .push(format!("({})", parts.join(" OR ")));
1023 }
1024 } else {
1025 let escaped = super::sql_safety::escape_string(&value.to_string());
1026 self.params.where_and.push(format!(
1027 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1028 escaped
1029 ));
1030 }
1031 }
1032 _ => {
1033 self.params
1034 .where_and
1035 .push(format!("{join_table}.{field} {compare} '{value}'"));
1036 }
1037 }
1038 self
1039 }
1040
1041 fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1042 for f in field.split('|') {
1043 if !super::sql_safety::validate_field_name(f) {
1044 error!("Invalid field name: {}", f);
1045 }
1046 }
1047 if !super::sql_safety::validate_compare_orator(compare) {
1048 error!("Invalid compare operator: {}", compare);
1049 }
1050 let join_table = if self.params.join_table.is_empty() {
1051 self.params.table.clone()
1052 } else {
1053 self.params.join_table.clone()
1054 };
1055
1056 if value.is_boolean() {
1057 let bool_val = value.as_bool().unwrap_or(false);
1058 self.params
1059 .where_or
1060 .push(format!("{join_table}.{field} {compare} {bool_val}"));
1061 return self;
1062 }
1063
1064 match compare {
1065 "between" => {
1066 self.params.where_or.push(format!(
1067 "{}.{} between '{}' AND '{}'",
1068 join_table, field, value[0], value[1]
1069 ));
1070 }
1071 "set" => {
1072 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1073 let mut wheredata = vec![];
1074 for item in list.iter() {
1075 wheredata.push(format!(
1076 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1077 ));
1078 }
1079 self.params
1080 .where_or
1081 .push(format!("({})", wheredata.join(" or ")));
1082 }
1083 "notin" => {
1084 let mut text = String::new();
1085 for item in value.members() {
1086 text = format!("{text},'{item}'");
1087 }
1088 text = text.trim_start_matches(",").into();
1089 self.params
1090 .where_or
1091 .push(format!("{join_table}.{field} not in ({text})"));
1092 }
1093 "is" => {
1094 self.params
1095 .where_or
1096 .push(format!("{join_table}.{field} is {value}"));
1097 }
1098 "isnot" => {
1099 self.params
1100 .where_or
1101 .push(format!("{join_table}.{field} is not {value}"));
1102 }
1103 "in" => {
1104 if value.is_array() && value.is_empty() {
1105 self.params.where_or.push("1=0".to_string());
1106 return self;
1107 }
1108 let mut text = String::new();
1109 if value.is_array() {
1110 for item in value.members() {
1111 text = format!("{text},'{item}'");
1112 }
1113 } else {
1114 let value = value.as_str().unwrap_or("");
1115 let value: Vec<&str> = value.split(",").collect();
1116 for item in value.iter() {
1117 text = format!("{text},'{item}'");
1118 }
1119 }
1120 text = text.trim_start_matches(",").into();
1121 self.params
1122 .where_or
1123 .push(format!("{join_table}.{field} {compare} ({text})"));
1124 }
1125 "json_contains" => {
1129 if value.is_array() {
1130 if value.is_empty() {
1131 self.params.where_or.push("1=0".to_string());
1132 } else {
1133 let mut parts = vec![];
1134 for item in value.members() {
1135 let escaped = super::sql_safety::escape_string(&item.to_string());
1136 parts.push(format!(
1137 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1138 escaped
1139 ));
1140 }
1141 self.params
1142 .where_or
1143 .push(format!("({})", parts.join(" OR ")));
1144 }
1145 } else {
1146 let escaped = super::sql_safety::escape_string(&value.to_string());
1147 self.params.where_or.push(format!(
1148 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1149 escaped
1150 ));
1151 }
1152 }
1153 _ => {
1154 self.params
1155 .where_or
1156 .push(format!("{join_table}.{field} {compare} '{value}'"));
1157 }
1158 }
1159 self
1160 }
1161
1162 fn where_raw(&mut self, expr: &str) -> &mut Self {
1163 self.params.where_and.push(expr.to_string());
1164 self
1165 }
1166
1167 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1168 self.params
1169 .where_and
1170 .push(format!("\"{field}\" IN ({sub_sql})"));
1171 self
1172 }
1173
1174 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1175 self.params
1176 .where_and
1177 .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1178 self
1179 }
1180
1181 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1182 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1183 self
1184 }
1185
1186 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1187 self.params
1188 .where_and
1189 .push(format!("NOT EXISTS ({sub_sql})"));
1190 self
1191 }
1192
1193 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1194 self.params.where_column = format!(
1195 "{}.{} {} {}.{}",
1196 self.params.table, field_a, compare, self.params.table, field_b
1197 );
1198 self
1199 }
1200
1201 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1202 self.params
1203 .update_column
1204 .push(format!("{field_a} = {compare}"));
1205 self
1206 }
1207
1208 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1209 self.params.page = page;
1210 self.params.limit = limit;
1211 self
1212 }
1213
1214 fn limit(&mut self, count: i32) -> &mut Self {
1215 self.params.limit_only = count;
1216 self
1217 }
1218
1219 fn column(&mut self, field: &str) -> JsonValue {
1220 self.field(field);
1221 let sql = self.params.select_sql();
1222
1223 if self.params.sql {
1224 return JsonValue::from(sql);
1225 }
1226 let (state, data) = self.query(sql.as_str());
1227 match state {
1228 true => {
1229 let mut list = array![];
1230 for item in data.members() {
1231 if self.params.json[field].is_empty() {
1232 let _ = list.push(item[field].clone());
1233 } else {
1234 let data =
1235 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1236 let _ = list.push(data);
1237 }
1238 }
1239 list
1240 }
1241 false => {
1242 array![]
1243 }
1244 }
1245 }
1246
1247 fn count(&mut self) -> JsonValue {
1248 self.params.fields = json::object! {};
1249 self.params.fields["count"] = "count(*) as count".to_string().into();
1250 let sql = self.params.select_sql();
1251 if self.params.sql {
1252 return JsonValue::from(sql.clone());
1253 }
1254 let (state, data) = self.query(sql.as_str());
1255 if state {
1256 data[0]["count"].clone()
1257 } else {
1258 JsonValue::from(0)
1259 }
1260 }
1261
1262 fn max(&mut self, field: &str) -> JsonValue {
1263 self.params.fields[field] = format!("max({field}) as {field}").into();
1264 let sql = self.params.select_sql();
1265 if self.params.sql {
1266 return JsonValue::from(sql.clone());
1267 }
1268 let (state, data) = self.query(sql.as_str());
1269 if state {
1270 if data.len() > 1 {
1271 return data.clone();
1272 }
1273 data[0][field].clone()
1274 } else {
1275 JsonValue::from(0)
1276 }
1277 }
1278
1279 fn min(&mut self, field: &str) -> JsonValue {
1280 self.params.fields[field] = format!("min({field}) as {field}").into();
1281 let sql = self.params.select_sql();
1282 if self.params.sql {
1283 return JsonValue::from(sql.clone());
1284 }
1285 let (state, data) = self.query(sql.as_str());
1286 if state {
1287 if data.len() > 1 {
1288 return data;
1289 }
1290 data[0][field].clone()
1291 } else {
1292 JsonValue::from(0)
1293 }
1294 }
1295
1296 fn sum(&mut self, field: &str) -> JsonValue {
1297 self.params.fields[field] = format!("sum({field}) as {field}").into();
1298 let sql = self.params.select_sql();
1299 if self.params.sql {
1300 return JsonValue::from(sql.clone());
1301 }
1302 let (state, data) = self.query(sql.as_str());
1303 match state {
1304 true => {
1305 if data.len() > 1 {
1306 return data;
1307 }
1308 data[0][field].clone()
1309 }
1310 false => JsonValue::from(0),
1311 }
1312 }
1313
1314 fn avg(&mut self, field: &str) -> JsonValue {
1315 self.params.fields[field] = format!("avg({field}) as {field}").into();
1316 let sql = self.params.select_sql();
1317 if self.params.sql {
1318 return JsonValue::from(sql.clone());
1319 }
1320 let (state, data) = self.query(sql.as_str());
1321 if state {
1322 if data.len() > 1 {
1323 return data;
1324 }
1325 data[0][field].clone()
1326 } else {
1327 JsonValue::from(0)
1328 }
1329 }
1330
1331 fn having(&mut self, expr: &str) -> &mut Self {
1332 self.params.having.push(expr.to_string());
1333 self
1334 }
1335
1336 fn select(&mut self) -> JsonValue {
1337 let sql = self.params.select_sql();
1338 if self.params.sql {
1339 return JsonValue::from(sql.clone());
1340 }
1341 let (state, mut data) = self.query(sql.as_str());
1342 match state {
1343 true => {
1344 for (field, _) in self.params.json.entries() {
1345 for item in data.members_mut() {
1346 if !item[field].is_empty() {
1347 let json = item[field].to_string();
1348 item[field] = match json::parse(&json) {
1349 Ok(e) => e,
1350 Err(_) => JsonValue::from(json),
1351 };
1352 }
1353 }
1354 }
1355 data.clone()
1356 }
1357 false => array![],
1358 }
1359 }
1360
1361 fn find(&mut self) -> JsonValue {
1362 self.params.page = 1;
1363 self.params.limit = 1;
1364 let sql = self.params.select_sql();
1365 if self.params.sql {
1366 return JsonValue::from(sql.clone());
1367 }
1368 let (state, mut data) = self.query(sql.as_str());
1369 match state {
1370 true => {
1371 if data.is_empty() {
1372 return object! {};
1373 }
1374 for (field, _) in self.params.json.entries() {
1375 if !data[0][field].is_empty() {
1376 let json = data[0][field].to_string();
1377 let json = json::parse(&json).unwrap_or(array![]);
1378 data[0][field] = json;
1379 } else {
1380 data[0][field] = array![];
1381 }
1382 }
1383 data[0].clone()
1384 }
1385 false => {
1386 object! {}
1387 }
1388 }
1389 }
1390
1391 fn value(&mut self, field: &str) -> JsonValue {
1392 self.params.fields = object! {};
1393 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1394 self.params.page = 1;
1395 self.params.limit = 1;
1396 let sql = self.params.select_sql();
1397 if self.params.sql {
1398 return JsonValue::from(sql.clone());
1399 }
1400 let (state, mut data) = self.query(sql.as_str());
1401 match state {
1402 true => {
1403 for (field, _) in self.params.json.entries() {
1404 if !data[0][field].is_empty() {
1405 let json = data[0][field].to_string();
1406 let json = json::parse(&json).unwrap_or(array![]);
1407 data[0][field] = json;
1408 } else {
1409 data[0][field] = array![];
1410 }
1411 }
1412 data[0][field].clone()
1413 }
1414 false => {
1415 if self.connection.debug {
1416 info!("{data:?}");
1417 }
1418 JsonValue::Null
1419 }
1420 }
1421 }
1422
1423 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1424 let fields_list = self.table_info(&self.params.table.clone());
1425 let mut fields = vec![];
1426 let mut values = vec![];
1427 if !self.params.autoinc && data["id"].is_empty() {
1428 let thread_id = format!("{:?}", std::thread::current().id());
1429 let thread_num: u64 = thread_id
1430 .trim_start_matches("ThreadId(")
1431 .trim_end_matches(")")
1432 .parse()
1433 .unwrap_or(0);
1434 data["id"] = format!(
1435 "{:X}{:X}",
1436 Local::now().timestamp_nanos_opt().unwrap_or(0),
1437 thread_num
1438 )
1439 .into();
1440 }
1441 for (field, value) in data.entries() {
1442 fields.push(format!("\"{}\"", field));
1443
1444 if value.is_string() {
1445 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1446 continue;
1447 } else if value.is_array() {
1448 if self.params.json[field].is_empty() {
1449 let array = value
1450 .members()
1451 .map(|x| x.as_str().unwrap_or(""))
1452 .collect::<Vec<&str>>()
1453 .join(",");
1454 values.push(format!("'{array}'"));
1455 } else {
1456 let json = value.to_string();
1457 let json = json.replace("'", "''");
1458 values.push(format!("'{json}'"));
1459 }
1460 continue;
1461 } else if value.is_object() {
1462 if self.params.json[field].is_empty() {
1463 values.push(format!("'{value}'"));
1464 } else {
1465 let json = value.to_string();
1466 let json = json.replace("'", "''");
1467 values.push(format!("'{json}'"));
1468 }
1469 continue;
1470 } else if value.is_number() {
1471 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1472 if col_type == "boolean" {
1473 let bool_val = value.as_i64().unwrap_or(0) != 0;
1474 values.push(format!("{bool_val}"));
1475 } else if col_type.contains("int") {
1476 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1477 } else {
1478 values.push(format!("{value}"));
1479 }
1480 continue;
1481 } else if value.is_boolean() || value.is_null() {
1482 values.push(format!("{value}"));
1483 continue;
1484 } else {
1485 values.push(format!("'{value}'"));
1486 continue;
1487 }
1488 }
1489 let fields = fields.join(",");
1490 let values = values.join(",");
1491
1492 let sql = format!(
1493 "INSERT INTO {} ({}) VALUES ({});",
1494 self.params.table, fields, values
1495 );
1496 if self.params.sql {
1497 return JsonValue::from(sql.clone());
1498 }
1499 let (state, ids) = self.execute(sql.as_str());
1500
1501 match state {
1502 true => match self.params.autoinc {
1503 true => ids.clone(),
1504 false => data["id"].clone(),
1505 },
1506 false => {
1507 let thread_id = format!("{:?}", thread::current().id());
1508 error!("添加失败: {thread_id} {ids:?} {sql}");
1509 JsonValue::from("")
1510 }
1511 }
1512 }
1513 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1514 let fields_list = self.table_info(&self.params.table.clone());
1515 let mut fields = String::new();
1516 if !self.params.autoinc && data[0]["id"].is_empty() {
1517 data[0]["id"] = "".into();
1518 }
1519 for (field, _) in data[0].entries() {
1520 fields = format!("{fields},\"{field}\"");
1521 }
1522 fields = fields.trim_start_matches(",").to_string();
1523
1524 let core_count = num_cpus::get();
1525 let mut p = pools::Pool::new(core_count * 4);
1526
1527 let autoinc = self.params.autoinc;
1528 for list in data.members() {
1529 let mut item = list.clone();
1530 let i = br_fields::str::Code::verification_code(3);
1531 let fields_list_new = fields_list.clone();
1532 p.execute(move |pcindex| {
1533 if !autoinc && item["id"].is_empty() {
1534 let id = format!(
1535 "{:X}{:X}{}",
1536 Local::now().timestamp_nanos_opt().unwrap_or(0),
1537 pcindex,
1538 i
1539 );
1540 item["id"] = id.into();
1541 }
1542 let mut values = "".to_string();
1543 for (field, value) in item.entries() {
1544 if value.is_string() {
1545 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1546 } else if value.is_number() {
1547 let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1548 if col_type == "boolean" {
1549 let bool_val = value.as_i64().unwrap_or(0) != 0;
1550 values = format!("{values},{bool_val}");
1551 } else if col_type.contains("int") {
1552 values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1553 } else {
1554 values = format!("{values},{value}");
1555 }
1556 } else if value.is_boolean() {
1557 values = format!("{values},{value}");
1558 continue;
1559 } else {
1560 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1561 }
1562 }
1563 values = format!("({})", values.trim_start_matches(","));
1564 array![item["id"].clone(), values]
1565 });
1566 }
1567 let (ids_list, mut values) = p.insert_all();
1568 values = values.trim_start_matches(",").to_string();
1569 let sql = format!(
1570 "INSERT INTO {} ({}) VALUES {};",
1571 self.params.table, fields, values
1572 );
1573
1574 if self.params.sql {
1575 return JsonValue::from(sql.clone());
1576 }
1577 let (state, data) = self.execute(sql.as_str());
1578 match state {
1579 true => match autoinc {
1580 true => data,
1581 false => JsonValue::from(ids_list),
1582 },
1583 false => {
1584 error!("insert_all: {data:?}");
1585 let _ = std::fs::write("pgsql_insert_all_error.log", format!("insert_all error:\n{:?}\n\nSQL:\n{}", data, sql));
1586 array![]
1587 }
1588 }
1589 }
1590 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1591 let fields_list = self.table_info(&self.params.table.clone());
1592 let mut fields = vec![];
1593 let mut values = vec![];
1594 if !self.params.autoinc && data["id"].is_empty() {
1595 let thread_id = format!("{:?}", std::thread::current().id());
1596 let thread_num: u64 = thread_id
1597 .trim_start_matches("ThreadId(")
1598 .trim_end_matches(")")
1599 .parse()
1600 .unwrap_or(0);
1601 data["id"] = format!(
1602 "{:X}{:X}",
1603 Local::now().timestamp_nanos_opt().unwrap_or(0),
1604 thread_num
1605 )
1606 .into();
1607 }
1608 for (field, value) in data.entries() {
1609 fields.push(format!("\"{}\"", field));
1610
1611 if value.is_string() {
1612 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1613 continue;
1614 } else if value.is_array() {
1615 if self.params.json[field].is_empty() {
1616 let array = value
1617 .members()
1618 .map(|x| x.as_str().unwrap_or(""))
1619 .collect::<Vec<&str>>()
1620 .join(",");
1621 values.push(format!("'{array}'"));
1622 } else {
1623 let json = value.to_string();
1624 let json = json.replace("'", "''");
1625 values.push(format!("'{json}'"));
1626 }
1627 continue;
1628 } else if value.is_object() {
1629 if self.params.json[field].is_empty() {
1630 values.push(format!("'{value}'"));
1631 } else {
1632 let json = value.to_string();
1633 let json = json.replace("'", "''");
1634 values.push(format!("'{json}'"));
1635 }
1636 continue;
1637 } else if value.is_number() {
1638 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1639 if col_type == "boolean" {
1640 let bool_val = value.as_i64().unwrap_or(0) != 0;
1641 values.push(format!("{bool_val}"));
1642 } else if col_type.contains("int") {
1643 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1644 } else {
1645 values.push(format!("{value}"));
1646 }
1647 continue;
1648 } else if value.is_boolean() || value.is_null() {
1649 values.push(format!("{value}"));
1650 continue;
1651 } else {
1652 values.push(format!("'{value}'"));
1653 continue;
1654 }
1655 }
1656
1657 let conflict_cols: Vec<String> = conflict_fields
1658 .iter()
1659 .map(|f| format!("\"{}\"", f))
1660 .collect();
1661
1662 let update_set: Vec<String> = fields
1663 .iter()
1664 .filter(|f| {
1665 let name = f.trim_matches('"');
1666 !conflict_fields.contains(&name) && name != "id"
1667 })
1668 .map(|f| format!("{f}=EXCLUDED.{f}"))
1669 .collect();
1670
1671 let fields_str = fields.join(",");
1672 let values_str = values.join(",");
1673
1674 let sql = format!(
1675 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1676 self.params.table,
1677 fields_str,
1678 values_str,
1679 conflict_cols.join(","),
1680 update_set.join(",")
1681 );
1682 if self.params.sql {
1683 return JsonValue::from(sql.clone());
1684 }
1685 let (state, result) = self.execute(sql.as_str());
1686 match state {
1687 true => match self.params.autoinc {
1688 true => result.clone(),
1689 false => data["id"].clone(),
1690 },
1691 false => {
1692 let thread_id = format!("{:?}", thread::current().id());
1693 error!("upsert失败: {thread_id} {result:?} {sql}");
1694 JsonValue::from("")
1695 }
1696 }
1697 }
1698 fn update(&mut self, data: JsonValue) -> JsonValue {
1699 let fields_list = self.table_info(&self.params.table.clone());
1700 let mut values = vec![];
1701 for (field, value) in data.entries() {
1702 if value.is_string() {
1703 values.push(format!(
1704 "\"{}\"='{}'",
1705 field,
1706 value.to_string().replace("'", "''")
1707 ));
1708 } else if value.is_number() {
1709 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1710 if col_type == "boolean" {
1711 let bool_val = value.as_i64().unwrap_or(0) != 0;
1712 values.push(format!("\"{field}\"= {bool_val}"));
1713 } else if col_type.contains("int") {
1714 values.push(format!(
1715 "\"{}\"= {}",
1716 field,
1717 value.as_f64().unwrap_or(0.0) as i64
1718 ));
1719 } else {
1720 values.push(format!("\"{field}\"= {value}"));
1721 }
1722 } else if value.is_array() {
1723 if self.params.json[field].is_empty() {
1724 let array = value
1725 .members()
1726 .map(|x| x.as_str().unwrap_or(""))
1727 .collect::<Vec<&str>>()
1728 .join(",");
1729 values.push(format!("\"{field}\"='{array}'"));
1730 } else {
1731 let json = value.to_string();
1732 let json = json.replace("'", "''");
1733 values.push(format!("\"{field}\"='{json}'"));
1734 }
1735 continue;
1736 } else if value.is_object() {
1737 if self.params.json[field].is_empty() {
1738 values.push(format!("\"{field}\"='{value}'"));
1739 } else {
1740 if value.is_empty() {
1741 values.push(format!("\"{field}\"=''"));
1742 continue;
1743 }
1744 let json = value.to_string();
1745 let json = json.replace("'", "''");
1746 values.push(format!("\"{field}\"='{json}'"));
1747 }
1748 continue;
1749 } else if value.is_boolean() {
1750 values.push(format!("\"{field}\"= {value}"));
1751 } else {
1752 values.push(format!("\"{field}\"=\"{value}\""));
1753 }
1754 }
1755
1756 for (field, value) in self.params.inc_dec.entries() {
1757 values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1758 }
1759 if !self.params.update_column.is_empty() {
1760 values.extend(self.params.update_column.clone());
1761 }
1762 let values = values.join(",");
1763
1764 let sql = format!(
1765 "UPDATE {} SET {} {};",
1766 self.params.table.clone(),
1767 values,
1768 self.params.where_sql()
1769 );
1770 if self.params.sql {
1771 return JsonValue::from(sql.clone());
1772 }
1773 let (state, data) = self.execute(sql.as_str());
1774 if state {
1775 data
1776 } else {
1777 let thread_id = format!("{:?}", thread::current().id());
1778 error!("update: {thread_id} {data:?} {sql}");
1779 0.into()
1780 }
1781 }
1782 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1783 let fields_list = self.table_info(&self.params.table.clone());
1784 let mut values = vec![];
1785
1786 let mut ids = vec![];
1787 for (field, _) in data[0].entries() {
1788 if field == "id" {
1789 continue;
1790 }
1791 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1792 let mut fields = vec![];
1793 for row in data.members() {
1794 let value = row[field].clone();
1795 let id = row["id"].clone();
1796 ids.push(id.clone());
1797 if value.is_string() {
1798 fields.push(format!(
1799 "WHEN '{}' THEN '{}'",
1800 id,
1801 value.to_string().replace("'", "''")
1802 ));
1803 } else if value.is_array() || value.is_object() {
1804 if self.params.json[field].is_empty() {
1805 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1806 } else {
1807 let json = value.to_string();
1808 let json = json.replace("'", "''");
1809 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1810 }
1811 continue;
1812 } else if value.is_number() {
1813 if col_type == "boolean" {
1814 let bool_val = value.as_i64().unwrap_or(0) != 0;
1815 fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1816 } else {
1817 fields.push(format!("WHEN '{id}' THEN {value}"));
1818 }
1819 } else if value.is_boolean() || value.is_null() {
1820 fields.push(format!("WHEN '{id}' THEN {value}"));
1821 } else {
1822 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1823 }
1824 }
1825 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1826 }
1827 self.where_and("id", "in", ids.into());
1828 for (field, value) in self.params.inc_dec.entries() {
1829 values.push(format!("{} = {}", field, value.to_string().clone()));
1830 }
1831
1832 let values = values.join(",");
1833 let sql = format!(
1834 "UPDATE {} SET {} {} {};",
1835 self.params.table.clone(),
1836 values,
1837 self.params.where_sql(),
1838 self.params.page_limit_sql()
1839 );
1840 if self.params.sql {
1841 return JsonValue::from(sql.clone());
1842 }
1843 let (state, data) = self.execute(sql.as_str());
1844 if state {
1845 data
1846 } else {
1847 error!("update_all: {data:?}");
1848 JsonValue::from(0)
1849 }
1850 }
1851
1852 fn delete(&mut self) -> JsonValue {
1853 let sql = format!(
1854 "delete FROM {} {} {};",
1855 self.params.table.clone(),
1856 self.params.where_sql(),
1857 self.params.page_limit_sql()
1858 );
1859 if self.params.sql {
1860 return JsonValue::from(sql.clone());
1861 }
1862 let (state, data) = self.execute(sql.as_str());
1863 match state {
1864 true => data,
1865 false => {
1866 error!("delete 失败>>> {data:?}");
1867 JsonValue::from(0)
1868 }
1869 }
1870 }
1871
1872 fn transaction(&mut self) -> bool {
1873 let thread_id = format!("{:?}", thread::current().id());
1874 let key = format!("{}{}", self.default, thread_id);
1875
1876 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1877 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1878 PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1879 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1880 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1881 return true;
1882 }
1883
1884 let mut conn = None;
1886 for attempt in 0..3u8 {
1887 match self.client.get_connect_for_transaction() {
1888 Ok(mut c) => {
1889 if c.is_valid() {
1890 conn = Some(c);
1891 break;
1892 }
1893 warn!("事务连接无效(第{}次)", attempt + 1);
1894 self.client.release_transaction_conn();
1895 }
1896 Err(e) => {
1897 warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1898 }
1899 }
1900 if attempt < 2 {
1901 thread::sleep(std::time::Duration::from_millis(200));
1902 }
1903 }
1904 let mut conn = match conn {
1905 Some(c) => c,
1906 None => {
1907 error!("获取事务连接重试耗尽");
1908 return false;
1909 }
1910 };
1911
1912 if let Err(e) = conn.execute("START TRANSACTION") {
1913 error!("启动事务失败: {e}");
1914 self.client.release_transaction_conn();
1915 return false;
1916 }
1917
1918
1919 PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1920 true
1921 }
1922 fn commit(&mut self) -> bool {
1923 let thread_id = format!("{:?}", thread::current().id());
1924 let key = format!("{}{}", self.default, thread_id);
1925
1926 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1927 error!("commit: 没有活跃的事务");
1928 return false;
1929 }
1930
1931 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1932 if depth > 1 {
1933 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1934 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1935 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1936 return true;
1937 }
1938
1939 let commit_result =
1940 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1941
1942 let success = match commit_result {
1943 Some(Ok(_)) => true,
1944 Some(Err(e)) => {
1945 error!("提交事务失败: {e}");
1946 false
1947 }
1948 None => {
1949 error!("提交事务失败: 未找到连接");
1950 false
1951 }
1952 };
1953
1954 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1955 self.client.release_transaction_conn();
1956 success
1957 }
1958
1959 fn rollback(&mut self) -> bool {
1960 let thread_id = format!("{:?}", thread::current().id());
1961 let key = format!("{}{}", self.default, thread_id);
1962
1963 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1964 error!("rollback: 没有活跃的事务");
1965 return false;
1966 }
1967
1968 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1969 if depth > 1 {
1970 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1971 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1972 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1973 return true;
1974 }
1975
1976 let rollback_result =
1977 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1978
1979 let success = match rollback_result {
1980 Some(Ok(_)) => true,
1981 Some(Err(e)) => {
1982 error!("回滚失败: {e}");
1983 false
1984 }
1985 None => {
1986 error!("回滚失败: 未找到连接");
1987 false
1988 }
1989 };
1990
1991 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1992 self.client.release_transaction_conn();
1993 success
1994 }
1995
1996 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1997 let (state, data) = self.query(sql);
1998 match state {
1999 true => Ok(data),
2000 false => Err(data.to_string()),
2001 }
2002 }
2003
2004 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2005 let (state, data) = self.execute(sql);
2006 match state {
2007 true => Ok(data),
2008 false => Err(data.to_string()),
2009 }
2010 }
2011
2012 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2013 self.params.inc_dec[field] = format!("{field} + {num}").into();
2014 self
2015 }
2016
2017 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2018 self.params.inc_dec[field] = format!("{field} - {num}").into();
2019 self
2020 }
2021 fn buildsql(&mut self) -> String {
2022 self.fetch_sql();
2023 let sql = self.select().to_string();
2024 format!("( {} ) {}", sql, self.params.table)
2025 }
2026
2027 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2028 for field in fields {
2029 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2030 }
2031 self
2032 }
2033
2034 fn join(
2035 &mut self,
2036 main_table: &str,
2037 main_fields: &str,
2038 right_table: &str,
2039 right_fields: &str,
2040 ) -> &mut Self {
2041 let main_table = if main_table.is_empty() {
2042 self.params.table.clone()
2043 } else {
2044 main_table.to_string()
2045 };
2046 self.params.join_table = right_table.to_string();
2047 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2048 self
2049 }
2050
2051 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2052 let main_fields = if main_fields.is_empty() {
2053 "id"
2054 } else {
2055 main_fields
2056 };
2057 let second_fields = if second_fields.is_empty() {
2058 self.params.table.clone()
2059 } else {
2060 second_fields.to_string().clone()
2061 };
2062 let sec_table_name = format!("{}{}", table, "_2");
2063 let second_table = format!("{} {}", table, sec_table_name.clone());
2064 self.params.join_table = sec_table_name.clone();
2065 self.params.join.push(format!(
2066 " INNER JOIN {} ON {}.{} = {}.{}",
2067 second_table, self.params.table, main_fields, sec_table_name, second_fields
2068 ));
2069 self
2070 }
2071
2072 fn join_right(
2073 &mut self,
2074 main_table: &str,
2075 main_fields: &str,
2076 right_table: &str,
2077 right_fields: &str,
2078 ) -> &mut Self {
2079 let main_table = if main_table.is_empty() {
2080 self.params.table.clone()
2081 } else {
2082 main_table.to_string()
2083 };
2084 self.params.join_table = right_table.to_string();
2085 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2086 self
2087 }
2088
2089 fn join_full(
2090 &mut self,
2091 main_table: &str,
2092 main_fields: &str,
2093 right_table: &str,
2094 right_fields: &str,
2095 ) -> &mut Self {
2096 let main_table = if main_table.is_empty() {
2097 self.params.table.clone()
2098 } else {
2099 main_table.to_string()
2100 };
2101 self.params.join_table = right_table.to_string();
2102 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2103 self
2104 }
2105
2106 fn union(&mut self, sub_sql: &str) -> &mut Self {
2107 self.params.unions.push(format!("UNION {sub_sql}"));
2108 self
2109 }
2110
2111 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2112 self.params.unions.push(format!("UNION ALL {sub_sql}"));
2113 self
2114 }
2115
2116 fn lock_for_update(&mut self) -> &mut Self {
2117 self.params.lock_mode = "FOR UPDATE".to_string();
2118 self
2119 }
2120
2121 fn lock_for_share(&mut self) -> &mut Self {
2122 self.params.lock_mode = "FOR SHARE".to_string();
2123 self
2124 }
2125}