1use crate::config::Connection;
2use crate::TABLE_FIELDS;
3use crate::pools;
4use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
5use crate::types::{DbMode, Mode, Params, TableOptions};
6use br_pgsql::pools::Pools;
7use br_pgsql::PgsqlError;
8use chrono::Local;
9use json::{array, object, JsonValue};
10use log::{error, info, warn};
11use std::thread;
12#[derive(Clone)]
13pub struct Pgsql {
14 pub connection: Connection,
16 pub default: String,
18 pub params: Params,
19 pub client: Pools,
20}
21
22impl Pgsql {
23 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
24 let port = connection
25 .hostport
26 .parse::<i32>()
27 .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
28
29 let cp_connection = connection.clone();
30 let config = object! {
31 debug: cp_connection.debug,
32 username: cp_connection.username,
33 userpass: cp_connection.userpass,
34 database: cp_connection.database,
35 hostname: cp_connection.hostname,
36 hostport: port,
37 charset: cp_connection.charset.str(),
38 };
39 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
40
41 let pools = pgsql.pools()?;
42 Ok(Self {
43 connection,
44 default: default.clone(),
45 params: Params::default("pgsql"),
46 client: pools,
47 })
48 }
49
50 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
51 let thread_id = format!("{:?}", thread::current().id());
52 let key = format!("{}{}", self.default, thread_id);
53
54 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
55 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
56
57 match result {
58 Some(Ok(e)) => {
59 if self.connection.debug {
60 info!("查询成功: {} {}", thread_id.clone(), sql);
61 }
62 (true, e.rows)
63 }
64 Some(Err(e)) => {
65 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
66 (false, JsonValue::from(e.to_string()))
67 }
68 None => {
69 error!("事务查询失败: 未找到事务连接 {thread_id}");
70 (false, JsonValue::from("未找到事务连接"))
71 }
72 }
73 } else {
74 let mut guard = match self.client.get_guard() {
75 Ok(g) => g,
76 Err(e) => {
77 error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
79 return (false, JsonValue::from(e.to_string()));
80 }
81 };
82 match guard.conn().query(sql) {
83 Ok(e) => {
84 if self.connection.debug {
85 info!("查询成功: {} {}", thread_id.clone(), sql);
86 }
87 (true, e.rows)
88 }
89 Err(ref e) if Self::is_retriable_error(e) => {
90 guard.discard();
93 self.client.flush_idle();
95 warn!("非事务查询连接断开(重试一次): {thread_id} {e}");
96 thread::sleep(std::time::Duration::from_millis(200));
97 let mut guard2 = match self.client.get_guard() {
98 Ok(g) => g,
99 Err(e) => {
100 error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
101 return (false, JsonValue::from(e.to_string()));
102 }
103 };
104 match guard2.conn().query(sql) {
105 Ok(e) => {
106 if self.connection.debug {
107 info!("查询成功(重试): {} {}", thread_id.clone(), sql);
108 }
109 (true, e.rows)
110 }
111 Err(e) => {
112 error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
113 (false, JsonValue::from(e.to_string()))
114 }
115 }
116 }
117 Err(e) => {
118 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
119 (false, JsonValue::from(e.to_string()))
120 }
121 }
122 }
123 }
124 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
125 let thread_id = format!("{:?}", thread::current().id());
126 let key = format!("{}{}", self.default, thread_id);
127
128 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
129 if !PGSQL_TRANSACTION_MANAGER.acquire_table_lock(
131 &self.params.table,
132 &thread_id,
133 std::time::Duration::from_secs(30),
134 ) {
135 error!("获取表锁超时: {} {}", self.params.table, thread_id);
136 return (false, JsonValue::from("table lock timeout"));
137 }
138 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
139
140 match result {
141 Some(Ok(e)) => {
142 if self.connection.debug {
143 info!("提交成功: {} {}", thread_id.clone(), sql);
144 }
145 if sql.contains("INSERT") {
146 (true, e.rows)
147 } else {
148 (true, e.affect_count.into())
149 }
150 }
151 Some(Err(e)) => {
152 error!("事务提交失败: {thread_id} {e}");
153 (false, JsonValue::from(e.to_string()))
154 }
155 None => {
156 error!("事务执行失败: 未找到事务连接 {thread_id}");
157 (false, JsonValue::from("未找到事务连接"))
158 }
159 }
160 } else {
161 let mut guard = match self.client.get_guard() {
163 Ok(g) => g,
164 Err(e) => {
165 error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
166 return (false, JsonValue::from(e.to_string()));
167 }
168 };
169 match guard.conn().execute(sql) {
170 Ok(e) => {
171 if self.connection.debug {
172 info!("提交成功: {} {}", thread_id.clone(), sql);
173 }
174 if sql.contains("INSERT") {
175 (true, e.rows)
176 } else {
177 (true, e.affect_count.into())
178 }
179 }
180 Err(ref e) if Self::is_retriable_error(e) => {
181 guard.discard();
184 self.client.flush_idle();
186 warn!("非事务执行连接断开(重试一次): {thread_id} {e}");
187 thread::sleep(std::time::Duration::from_millis(200));
188 let mut guard2 = match self.client.get_guard() {
189 Ok(g) => g,
190 Err(e) => {
191 error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
192 return (false, JsonValue::from(e.to_string()));
193 }
194 };
195 match guard2.conn().execute(sql) {
196 Ok(e) => {
197 if self.connection.debug {
198 info!("提交成功(重试): {} {}", thread_id.clone(), sql);
199 }
200 if sql.contains("INSERT") {
201 (true, e.rows)
202 } else {
203 (true, e.affect_count.into())
204 }
205 }
206 Err(e) => {
207 error!("非事务执行重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
208 (false, JsonValue::from(e.to_string()))
209 }
210 }
211 }
212 Err(e) => {
213 error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
214 (false, JsonValue::from(e.to_string()))
215 }
216 }
217 }
218 }
219
220 fn is_retriable_error(e: &PgsqlError) -> bool {
222 matches!(
223 e,
224 PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
225 )
226 }
227}
228
229impl DbMode for Pgsql {
230 fn database_tables(&mut self) -> JsonValue {
231 let sql = "SHOW TABLES".to_string();
232 match self.sql(sql.as_str()) {
233 Ok(e) => {
234 let mut list = vec![];
235 for item in e.members() {
236 for (_, value) in item.entries() {
237 list.push(value.clone());
238 }
239 }
240 list.into()
241 }
242 Err(_) => {
243 array![]
244 }
245 }
246 }
247
248 fn database_create(&mut self, name: &str) -> bool {
249 let sql = format!("CREATE DATABASE {name}");
250
251 let (state, data) = self.execute(sql.as_str());
252 match state {
253 true => data.as_bool().unwrap_or(true),
254 false => {
255 error!("创建数据库失败: {data:?}");
256 false
257 }
258 }
259 }
260
261 fn truncate(&mut self, table: &str) -> bool {
262 let sql = format!("TRUNCATE TABLE {table}");
263 let (state, _) = self.execute(sql.as_str());
264 state
265 }
266}
267
268impl Mode for Pgsql {
269 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
270 let mut sql = String::new();
271 let mut comments = vec![];
272
273 if !options.table_unique.is_empty() {
274 let full_name = format!(
275 "{}_unique_{}",
276 options.table_name,
277 options.table_unique.join("_")
278 );
279 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
280 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
281 let unique = format!(
282 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
283 name,
284 options.table_name,
285 options.table_unique.join(",")
286 );
287 comments.push(unique);
288 }
289
290 for row in options.table_index.iter() {
291 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
292 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
293 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
294 let index = format!(
295 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
296 name,
297 options.table_name,
298 row.join(",")
299 );
300 comments.push(index);
301 }
302
303 for (name, field) in options.table_fields.entries_mut() {
304 field["table_name"] = options.table_name.clone().into();
305 let row = br_fields::field("pgsql", name, field.clone());
306 let (col_sql, meta) = if let Some(idx) = row.find("--") {
307 (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
308 } else {
309 (row.trim(), None)
310 };
311 if let Some(meta) = meta {
312 comments.push(format!(
313 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
314 options.table_name, name, meta
315 ));
316 }
317 sql = format!("{} {},\r\n", sql, col_sql);
318 }
319
320 let primary_key = format!(
321 "CONSTRAINT {}_{} PRIMARY KEY ({})",
322 options.table_name, options.table_key, options.table_key
323 );
324 let sql = format!(
325 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
326 options.table_name,
327 sql.trim_end_matches(",\r\n"),
328 primary_key
329 );
330 comments.insert(0, sql);
331
332 for (_name, field) in options.table_fields.entries() {
333 let _ = field["mode"].as_str();
334 }
335
336 if self.params.sql {
337 let info = comments.join("\r\n");
338 return JsonValue::from(info);
339 }
340 for comment in comments {
341 let (state, _) = self.execute(comment.as_str());
342 match state {
343 true => {}
344 false => {
345 return JsonValue::from(state);
346 }
347 }
348 }
349 JsonValue::from(true)
350 }
351
352 fn table_update(&mut self, options: TableOptions) -> JsonValue {
353 let cache_key = format!("{}{}", self.default, options.table_name);
355 let table_fields_guard = match TABLE_FIELDS.read() {
356 Ok(g) => g,
357 Err(e) => e.into_inner(),
358 };
359 if table_fields_guard.get(&cache_key).is_some() {
360 drop(table_fields_guard);
361 let mut table_fields_guard = match TABLE_FIELDS.write() {
362 Ok(g) => g,
363 Err(e) => e.into_inner(),
364 };
365 table_fields_guard.remove(&cache_key);
366 } else {
367 drop(table_fields_guard);
368 }
369 let fields_list = self.table_info(&options.table_name);
370 let mut put = vec![];
371 let mut add = vec![];
372 let mut del = vec![];
373 let mut comments = vec![];
374
375 for (key, _) in fields_list.entries() {
376 if options.table_fields[key].is_empty() {
377 del.push(key);
378 }
379 }
380 for (name, field) in options.table_fields.entries() {
381 if !fields_list[name].is_empty() {
382 let old_info = &fields_list[name];
383 let new_field_sql = br_fields::field("pgsql", name, field.clone());
384
385 let old_comment = old_info["comment"].as_str().unwrap_or("");
386 let old_type = old_info["type"].as_str().unwrap_or("");
387
388 let new_comment = if let Some(idx) = new_field_sql.find("--") {
389 new_field_sql[idx + 2..].trim()
390 } else {
391 ""
392 };
393
394 let comment_matches =
395 if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
396 let old_parts: Vec<&str> = old_comment.split('|').collect();
397 let new_parts: Vec<&str> = new_comment.split('|').collect();
398 if old_parts.len() >= 4 && new_parts.len() >= 4 {
399 old_parts[..4] == new_parts[..4]
400 } else {
401 old_comment == new_comment
402 }
403 } else if !old_comment.is_empty() && !new_comment.is_empty() {
404 let old_parts: Vec<&str> = old_comment.split('|').collect();
405 let new_parts: Vec<&str> = new_comment.split('|').collect();
406 if old_parts.len() >= 2
407 && new_parts.len() >= 2
408 && old_parts.len() == new_parts.len()
409 {
410 let old_filtered: Vec<&str> = old_parts
411 .iter()
412 .filter(|v| **v != "true" && **v != "false")
413 .copied()
414 .collect();
415 let new_filtered: Vec<&str> = new_parts
416 .iter()
417 .filter(|v| **v != "true" && **v != "false")
418 .copied()
419 .collect();
420 old_filtered == new_filtered
421 } else {
422 old_comment == new_comment
423 }
424 } else {
425 old_comment == new_comment
426 };
427
428 let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
429 let new_type = if sql_parts.len() > 1 {
430 sql_parts[1].to_lowercase()
431 } else {
432 String::new()
433 };
434
435 let type_matches = match old_type {
436 "integer" => {
437 new_type.contains("int")
438 && !new_type.contains("bigint")
439 && !new_type.contains("smallint")
440 }
441 "bigint" => new_type.contains("bigint"),
442 "smallint" => new_type.contains("smallint"),
443 "boolean" => new_type.contains("boolean"),
444 "text" => new_type.contains("text"),
445 "character varying" => {
446 if !new_type.contains("varchar") {
447 false
448 } else {
449 let old_len = old_info["max_length"].as_i64().unwrap_or(0);
450 let new_len = new_type
451 .trim_start_matches("varchar(")
452 .trim_end_matches(')')
453 .parse::<i64>()
454 .unwrap_or(0);
455 let matched = old_len == new_len || new_len == 0;
456 if !matched {
457 log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
458 }
459 old_len == new_len || new_len == 0
460 }
461 }
462 "character" => new_type.contains("char") && !new_type.contains("varchar"),
463 "numeric" => {
464 if !(new_type.contains("numeric") || new_type.contains("decimal")) {
465 false
466 } else {
467 let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
468 let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
469 let inner = new_type
470 .replace("numeric(", "")
471 .replace("decimal(", "")
472 .replace(')', "");
473 let parts: Vec<&str> = inner.split(',').collect();
474 let new_prec = parts.first().and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
475 let new_scale = parts.get(1).and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
476 old_prec == new_prec && old_scale == new_scale
477 }
478 }
479 "double precision" => {
480 new_type.contains("double") || new_type.contains("float8")
481 }
482 "real" => new_type.contains("real") || new_type.contains("float4"),
483 "timestamp without time zone" | "timestamp with time zone" => {
484 new_type.contains("timestamp")
485 }
486 "date" => new_type.contains("date") && !new_type.contains("timestamp"),
487 "time without time zone" | "time with time zone" => {
488 new_type.contains("time") && !new_type.contains("timestamp")
489 }
490 "json" | "jsonb" => new_type.contains("json"),
491 "uuid" => new_type.contains("uuid"),
492 "bytea" => new_type.contains("bytea"),
493 _ => old_type == new_type,
494 };
495
496 if type_matches && comment_matches {
497 continue;
498 }
499
500 log::debug!(
501 "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
502 options.table_name,
503 name,
504 type_matches,
505 old_type,
506 new_type,
507 comment_matches
508 );
509 put.push(name);
510 } else {
511 add.push(name);
512 }
513 }
514
515 for name in add.iter() {
516 let name = name.to_string();
517 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
518 let rows = row.split("--").collect::<Vec<&str>>();
519 comments.push(format!(
520 r#"ALTER TABLE "{}" add {};"#,
521 options.table_name,
522 rows[0].trim()
523 ));
524 if rows.len() > 1 {
525 comments.push(format!(
526 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
527 options.table_name,
528 name,
529 rows[1].trim()
530 ));
531 }
532 }
533 for name in del.iter() {
534 comments.push(format!(
535 "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
536 options.table_name, name
537 ));
538 }
539 for name in put.iter() {
540 let name = name.to_string();
541 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
542 let rows = row.split("--").collect::<Vec<&str>>();
543
544 let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
545
546 if sql[1].contains("BOOLEAN") {
547 let text = format!(
548 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
549 options.table_name, name
550 );
551 comments.push(text.clone());
552 let text = format!(
553 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
554 options.table_name, name, sql[1]
555 );
556 comments.push(text.clone());
557 } else {
558 let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
559 let new_type_lower = sql[1].to_lowercase();
560 let is_date_to_numeric = (old_col_type == "date"
561 || old_col_type.contains("timestamp"))
562 && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
563 if is_date_to_numeric {
564 comments.push(format!(
565 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
566 options.table_name, name
567 ));
568 comments.push(format!(
569 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
570 options.table_name, name, sql[1], name, name, name
571 ));
572 } else {
573 let text = format!(
574 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
575 options.table_name, name, sql[1]
576 );
577 comments.push(text.clone());
578 }
579 };
580
581 if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
582 let default_value = rows[0][default_pos + 9..].trim();
583 if !default_value.is_empty() {
584 comments.push(format!(
585 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
586 options.table_name, name, default_value
587 ));
588 }
589 }
590 let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
593 .as_str()
594 .unwrap_or("YES");
595 let old_is_required = old_is_nullable == "NO";
596
597 if old_is_required && name != options.table_key {
599 comments.push(format!(
600 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
601 options.table_name, name
602 ));
603 }
604
605 if rows.len() > 1 {
606 comments.push(format!(
607 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
608 options.table_name,
609 name,
610 rows[1].trim()
611 ));
612 }
613 }
614
615 let mut unique_new = vec![];
616 let mut index_new = vec![];
617 let mut primary_key = vec![];
618 let (_, index_list) = self.query(
619 format!(
620 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
621 options.table_name
622 )
623 .as_str(),
624 );
625 for item in index_list.members() {
626 let key_name = item["indexname"].as_str().unwrap_or("");
627 let indexdef = item["indexdef"].to_string();
628
629 if indexdef.contains(
630 format!(
631 "CREATE UNIQUE INDEX {}_{} ON",
632 options.table_name, options.table_key
633 )
634 .as_str(),
635 ) {
636 primary_key.push(key_name.to_string());
637 continue;
638 }
639 if indexdef.contains("CREATE UNIQUE INDEX") {
640 unique_new.push(key_name.to_string());
641 continue;
642 }
643 if indexdef.contains("CREATE INDEX") {
644 index_new.push(key_name.to_string());
645 continue;
646 }
647 }
648
649 if !options.table_unique.is_empty() {
650 let full_name = format!(
651 "{}_unique_{}",
652 options.table_name,
653 options.table_unique.join("_")
654 );
655 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
656 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
657 let unique = format!(
658 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
659 name,
660 options.table_name,
661 options.table_unique.join(",")
662 );
663 if !unique_new.contains(&name) {
664 comments.push(unique);
665 }
666 unique_new.retain(|x| *x != name);
667 }
668
669 for row in options.table_index.iter() {
670 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
671 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
672 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
673 let index = format!(
674 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
675 name,
676 options.table_name,
677 row.join(",")
678 );
679 if !index_new.contains(&name) {
680 comments.push(index);
681 }
682 index_new.retain(|x| *x != name);
683 }
684
685 for item in unique_new {
686 if item.ends_with("_pkey") {
687 continue;
688 }
689 if item.starts_with("unique_") {
690 comments.push(format!(
691 "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
692 options.table_name,
693 item.clone()
694 ));
695 } else {
696 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
697 }
698 }
699 for item in index_new {
700 if item.ends_with("_pkey") {
701 continue;
702 }
703 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
704 }
705
706 if self.params.sql {
707 return JsonValue::from(comments.join(""));
708 }
709
710 if comments.is_empty() {
711 return JsonValue::from(-1);
712 }
713
714 for item in comments.iter() {
715 let (state, res) = self.execute(item.as_str());
716 match state {
717 true => {}
718 false => {
719 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
720 return JsonValue::from(0);
721 }
722 }
723 }
724 JsonValue::from(1)
725 }
726
727 fn table_info(&mut self, table: &str) -> JsonValue {
728 let cache_key = format!("{}{}", self.default, table);
730 let table_fields_guard = match TABLE_FIELDS.read() {
731 Ok(g) => g,
732 Err(e) => e.into_inner(),
733 };
734 if let Some(cached) = table_fields_guard.get(&cache_key) {
735 return cached.clone();
736 }
737 drop(table_fields_guard);
738 let sql = format!(
739 "SELECT COL.COLUMN_NAME,
740 COL.DATA_TYPE,
741 COL.IS_NULLABLE,
742 COL.CHARACTER_MAXIMUM_LENGTH,
743 COL.NUMERIC_PRECISION,
744 COL.NUMERIC_SCALE,
745 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
746 LEFT JOIN
747 pg_catalog.pg_description DESCRIPTION
748 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
749 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
750 let (state, data) = self.query(sql.as_str());
751 let mut list = object! {};
752 if state {
753 for item in data.members() {
754 let mut row = object! {};
755 row["field"] = item["column_name"].clone();
756 row["comment"] = item["comment"].clone();
757 row["type"] = item["data_type"].clone();
758 row["is_nullable"] = item["is_nullable"].clone();
759 row["max_length"] = item["character_maximum_length"].clone();
760 row["numeric_precision"] = item["numeric_precision"].clone();
761 row["numeric_scale"] = item["numeric_scale"].clone();
762 if let Some(field_name) = row["field"].as_str() {
763 list[field_name] = row.clone();
764 }
765 }
766 let mut table_fields_guard = match TABLE_FIELDS.write() {
768 Ok(g) => g,
769 Err(e) => e.into_inner(),
770 };
771 table_fields_guard.insert(cache_key, list.clone());
772 list
773 } else {
774 list
775 }
776 }
777
778 fn table_is_exist(&mut self, name: &str) -> bool {
779 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
780 let (state, data) = self.query(sql.as_str());
781 match state {
782 true => {
783 for item in data.members() {
784 if item.has_key("exists") {
785 return item["exists"].as_bool().unwrap_or(false);
786 }
787 }
788 false
789 }
790 false => false,
791 }
792 }
793
794 fn table(&mut self, name: &str) -> &mut Pgsql {
795 self.params = Params::default(self.connection.mode.str().as_str());
796 let table_name = format!("{}{}", self.connection.prefix, name);
797 if !super::sql_safety::validate_table_name(&table_name) {
798 error!("Invalid table name: {}", name);
799 }
800 self.params.table = table_name.clone();
801 self.params.join_table = table_name;
802 self
803 }
804
805 fn change_table(&mut self, name: &str) -> &mut Self {
806 self.params.join_table = name.to_string();
807 self
808 }
809
810 fn autoinc(&mut self) -> &mut Self {
811 self.params.autoinc = true;
812 self
813 }
814
815 fn timestamps(&mut self) -> &mut Self {
816 self.params.timestamps = true;
817 self
818 }
819
820 fn fetch_sql(&mut self) -> &mut Self {
821 self.params.sql = true;
822 self
823 }
824
825 fn order(&mut self, field: &str, by: bool) -> &mut Self {
826 self.params.order[field] = {
827 if by {
828 "DESC"
829 } else {
830 "ASC"
831 }
832 }
833 .into();
834 self
835 }
836
837 fn group(&mut self, field: &str) -> &mut Self {
838 let fields: Vec<&str> = field.split(",").collect();
839 for field in fields.iter() {
840 let field = field.to_string();
841 self.params.group[field.as_str()] = field.clone().into();
842 self.params.fields[field.as_str()] = field.clone().into();
843 }
844 self
845 }
846
847 fn distinct(&mut self) -> &mut Self {
848 self.params.distinct = true;
849 self
850 }
851
852 fn json(&mut self, field: &str) -> &mut Self {
853 let list: Vec<&str> = field.split(",").collect();
854 for item in list.iter() {
855 self.params.json[item.to_string().as_str()] = item.to_string().into();
856 }
857 self
858 }
859
860 fn location(&mut self, field: &str) -> &mut Self {
861 let list: Vec<&str> = field.split(",").collect();
862 for item in list.iter() {
863 self.params.location[item.to_string().as_str()] = item.to_string().into();
864 }
865 self
866 }
867
868 fn field(&mut self, field: &str) -> &mut Self {
869 let list: Vec<&str> = field.split(",").collect();
870 let join_table = if self.params.join_table.is_empty() {
871 self.params.table.clone()
872 } else {
873 self.params.join_table.clone()
874 };
875 for item in list.iter() {
876 let lower = item.to_lowercase();
877 let is_expr = lower.contains("count(")
878 || lower.contains("sum(")
879 || lower.contains("avg(")
880 || lower.contains("max(")
881 || lower.contains("min(")
882 || lower.contains("case ");
883 if is_expr {
884 self.params.fields[item.to_string().as_str()] = (*item).into();
885 } else if item.contains(" as ") {
886 let text = item.split(" as ").collect::<Vec<&str>>();
887 self.params.fields[item.to_string().as_str()] =
888 format!("{}.{} as {}", join_table, text[0], text[1]).into();
889 } else {
890 self.params.fields[item.to_string().as_str()] =
891 format!("{join_table}.{item}").into();
892 }
893 }
894 self
895 }
896
897 fn field_raw(&mut self, expr: &str) -> &mut Self {
898 self.params.fields[expr] = expr.into();
899 self
900 }
901
902 fn hidden(&mut self, name: &str) -> &mut Self {
903 let hidden: Vec<&str> = name.split(",").collect();
904
905 let fields_list = self.table_info(self.params.clone().table.as_str());
906 let mut data = array![];
907 for item in fields_list.members() {
908 let _ = data.push(object! {
909 "name":item["field"].as_str().unwrap_or("")
910 });
911 }
912
913 for item in data.members() {
914 let name = item["name"].as_str().unwrap_or("");
915 if !hidden.contains(&name) {
916 self.params.fields[name] = name.into();
917 }
918 }
919 self
920 }
921
922 fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
923 for f in field.split('|') {
924 if !super::sql_safety::validate_field_name(f) {
925 error!("Invalid field name: {}", f);
926 }
927 }
928 if !super::sql_safety::validate_compare_orator(compare) {
929 error!("Invalid compare operator: {}", compare);
930 }
931 let join_table = if self.params.join_table.is_empty() {
932 self.params.table.clone()
933 } else {
934 self.params.join_table.clone()
935 };
936 if value.is_boolean() {
937 let bool_val = value.as_bool().unwrap_or(false);
938 self.params
939 .where_and
940 .push(format!("{join_table}.{field} {compare} {bool_val}"));
941 return self;
942 }
943 match compare {
944 "between" => {
945 self.params.where_and.push(format!(
946 "{}.{} between '{}' AND '{}'",
947 join_table, field, value[0], value[1]
948 ));
949 }
950 "set" => {
951 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
952 let mut wheredata = vec![];
953 for item in list.iter() {
954 wheredata.push(format!(
955 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
956 ));
957 }
958 self.params
959 .where_and
960 .push(format!("({})", wheredata.join(" or ")));
961 }
962 "notin" => {
963 let mut text = String::new();
964 for item in value.members() {
965 text = format!("{text},'{item}'");
966 }
967 text = text.trim_start_matches(",").into();
968 self.params
969 .where_and
970 .push(format!("{join_table}.{field} not in ({text})"));
971 }
972 "is" => {
973 self.params
974 .where_and
975 .push(format!("{join_table}.{field} is {value}"));
976 }
977 "isnot" => {
978 self.params
979 .where_and
980 .push(format!("{join_table}.{field} is not {value}"));
981 }
982 "notlike" => {
983 self.params
984 .where_and
985 .push(format!("{join_table}.{field} not like '{value}'"));
986 }
987 "in" => {
988 if value.is_array() && value.is_empty() {
989 self.params.where_and.push("1=0".to_string());
990 return self;
991 }
992 let mut text = String::new();
993 if value.is_array() {
994 for item in value.members() {
995 text = format!("{text},'{item}'");
996 }
997 } else if value.is_null() {
998 text = format!("{text},null");
999 } else {
1000 let value = value.as_str().unwrap_or("");
1001
1002 let value: Vec<&str> = value.split(",").collect();
1003 for item in value.iter() {
1004 text = format!("{text},'{item}'");
1005 }
1006 }
1007 text = text.trim_start_matches(",").into();
1008
1009 self.params
1010 .where_and
1011 .push(format!("{join_table}.{field} {compare} ({text})"));
1012 }
1013 "json_contains" => {
1017 if value.is_array() {
1018 if value.is_empty() {
1019 self.params.where_and.push("1=0".to_string());
1020 } else {
1021 let mut parts = vec![];
1022 for item in value.members() {
1023 let escaped = super::sql_safety::escape_string(&item.to_string());
1024 parts.push(format!(
1025 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1026 escaped
1027 ));
1028 }
1029 self.params
1030 .where_and
1031 .push(format!("({})", parts.join(" OR ")));
1032 }
1033 } else {
1034 let escaped = super::sql_safety::escape_string(&value.to_string());
1035 self.params.where_and.push(format!(
1036 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1037 escaped
1038 ));
1039 }
1040 }
1041 _ => {
1042 self.params
1043 .where_and
1044 .push(format!("{join_table}.{field} {compare} '{value}'"));
1045 }
1046 }
1047 self
1048 }
1049
1050 fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1051 for f in field.split('|') {
1052 if !super::sql_safety::validate_field_name(f) {
1053 error!("Invalid field name: {}", f);
1054 }
1055 }
1056 if !super::sql_safety::validate_compare_orator(compare) {
1057 error!("Invalid compare operator: {}", compare);
1058 }
1059 let join_table = if self.params.join_table.is_empty() {
1060 self.params.table.clone()
1061 } else {
1062 self.params.join_table.clone()
1063 };
1064
1065 if value.is_boolean() {
1066 let bool_val = value.as_bool().unwrap_or(false);
1067 self.params
1068 .where_or
1069 .push(format!("{join_table}.{field} {compare} {bool_val}"));
1070 return self;
1071 }
1072
1073 match compare {
1074 "between" => {
1075 self.params.where_or.push(format!(
1076 "{}.{} between '{}' AND '{}'",
1077 join_table, field, value[0], value[1]
1078 ));
1079 }
1080 "set" => {
1081 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1082 let mut wheredata = vec![];
1083 for item in list.iter() {
1084 wheredata.push(format!(
1085 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1086 ));
1087 }
1088 self.params
1089 .where_or
1090 .push(format!("({})", wheredata.join(" or ")));
1091 }
1092 "notin" => {
1093 let mut text = String::new();
1094 for item in value.members() {
1095 text = format!("{text},'{item}'");
1096 }
1097 text = text.trim_start_matches(",").into();
1098 self.params
1099 .where_or
1100 .push(format!("{join_table}.{field} not in ({text})"));
1101 }
1102 "is" => {
1103 self.params
1104 .where_or
1105 .push(format!("{join_table}.{field} is {value}"));
1106 }
1107 "isnot" => {
1108 self.params
1109 .where_or
1110 .push(format!("{join_table}.{field} is not {value}"));
1111 }
1112 "in" => {
1113 if value.is_array() && value.is_empty() {
1114 self.params.where_or.push("1=0".to_string());
1115 return self;
1116 }
1117 let mut text = String::new();
1118 if value.is_array() {
1119 for item in value.members() {
1120 text = format!("{text},'{item}'");
1121 }
1122 } else {
1123 let value = value.as_str().unwrap_or("");
1124 let value: Vec<&str> = value.split(",").collect();
1125 for item in value.iter() {
1126 text = format!("{text},'{item}'");
1127 }
1128 }
1129 text = text.trim_start_matches(",").into();
1130 self.params
1131 .where_or
1132 .push(format!("{join_table}.{field} {compare} ({text})"));
1133 }
1134 "json_contains" => {
1138 if value.is_array() {
1139 if value.is_empty() {
1140 self.params.where_or.push("1=0".to_string());
1141 } else {
1142 let mut parts = vec![];
1143 for item in value.members() {
1144 let escaped = super::sql_safety::escape_string(&item.to_string());
1145 parts.push(format!(
1146 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1147 escaped
1148 ));
1149 }
1150 self.params
1151 .where_or
1152 .push(format!("({})", parts.join(" OR ")));
1153 }
1154 } else {
1155 let escaped = super::sql_safety::escape_string(&value.to_string());
1156 self.params.where_or.push(format!(
1157 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1158 escaped
1159 ));
1160 }
1161 }
1162 _ => {
1163 self.params
1164 .where_or
1165 .push(format!("{join_table}.{field} {compare} '{value}'"));
1166 }
1167 }
1168 self
1169 }
1170
1171 fn where_raw(&mut self, expr: &str) -> &mut Self {
1172 self.params.where_and.push(expr.to_string());
1173 self
1174 }
1175
1176 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1177 self.params
1178 .where_and
1179 .push(format!("\"{field}\" IN ({sub_sql})"));
1180 self
1181 }
1182
1183 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1184 self.params
1185 .where_and
1186 .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1187 self
1188 }
1189
1190 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1191 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1192 self
1193 }
1194
1195 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1196 self.params
1197 .where_and
1198 .push(format!("NOT EXISTS ({sub_sql})"));
1199 self
1200 }
1201
1202 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1203 self.params.where_column = format!(
1204 "{}.{} {} {}.{}",
1205 self.params.table, field_a, compare, self.params.table, field_b
1206 );
1207 self
1208 }
1209
1210 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1211 self.params
1212 .update_column
1213 .push(format!("{field_a} = {compare}"));
1214 self
1215 }
1216
1217 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1218 self.params.page = page;
1219 self.params.limit = limit;
1220 self
1221 }
1222
1223 fn limit(&mut self, count: i32) -> &mut Self {
1224 self.params.limit_only = count;
1225 self
1226 }
1227
1228 fn column(&mut self, field: &str) -> JsonValue {
1229 self.field(field);
1230 let sql = self.params.select_sql();
1231
1232 if self.params.sql {
1233 return JsonValue::from(sql);
1234 }
1235 let (state, data) = self.query(sql.as_str());
1236 match state {
1237 true => {
1238 let mut list = array![];
1239 for item in data.members() {
1240 if self.params.json[field].is_empty() {
1241 let _ = list.push(item[field].clone());
1242 } else {
1243 let data =
1244 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1245 let _ = list.push(data);
1246 }
1247 }
1248 list
1249 }
1250 false => {
1251 array![]
1252 }
1253 }
1254 }
1255
1256 fn count(&mut self) -> JsonValue {
1257 self.params.fields = json::object! {};
1258 self.params.fields["count"] = "count(*) as count".to_string().into();
1259 let sql = self.params.select_sql();
1260 if self.params.sql {
1261 return JsonValue::from(sql.clone());
1262 }
1263 let (state, data) = self.query(sql.as_str());
1264 if state {
1265 data[0]["count"].clone()
1266 } else {
1267 JsonValue::from(0)
1268 }
1269 }
1270
1271 fn max(&mut self, field: &str) -> JsonValue {
1272 self.params.fields[field] = format!("max({field}) as {field}").into();
1273 let sql = self.params.select_sql();
1274 if self.params.sql {
1275 return JsonValue::from(sql.clone());
1276 }
1277 let (state, data) = self.query(sql.as_str());
1278 if state {
1279 if data.len() > 1 {
1280 return data.clone();
1281 }
1282 data[0][field].clone()
1283 } else {
1284 JsonValue::from(0)
1285 }
1286 }
1287
1288 fn min(&mut self, field: &str) -> JsonValue {
1289 self.params.fields[field] = format!("min({field}) as {field}").into();
1290 let sql = self.params.select_sql();
1291 if self.params.sql {
1292 return JsonValue::from(sql.clone());
1293 }
1294 let (state, data) = self.query(sql.as_str());
1295 if state {
1296 if data.len() > 1 {
1297 return data;
1298 }
1299 data[0][field].clone()
1300 } else {
1301 JsonValue::from(0)
1302 }
1303 }
1304
1305 fn sum(&mut self, field: &str) -> JsonValue {
1306 self.params.fields[field] = format!("sum({field}) as {field}").into();
1307 let sql = self.params.select_sql();
1308 if self.params.sql {
1309 return JsonValue::from(sql.clone());
1310 }
1311 let (state, data) = self.query(sql.as_str());
1312 match state {
1313 true => {
1314 if data.len() > 1 {
1315 return data;
1316 }
1317 data[0][field].clone()
1318 }
1319 false => JsonValue::from(0),
1320 }
1321 }
1322
1323 fn avg(&mut self, field: &str) -> JsonValue {
1324 self.params.fields[field] = format!("avg({field}) as {field}").into();
1325 let sql = self.params.select_sql();
1326 if self.params.sql {
1327 return JsonValue::from(sql.clone());
1328 }
1329 let (state, data) = self.query(sql.as_str());
1330 if state {
1331 if data.len() > 1 {
1332 return data;
1333 }
1334 data[0][field].clone()
1335 } else {
1336 JsonValue::from(0)
1337 }
1338 }
1339
1340 fn having(&mut self, expr: &str) -> &mut Self {
1341 self.params.having.push(expr.to_string());
1342 self
1343 }
1344
1345 fn select(&mut self) -> JsonValue {
1346 let sql = self.params.select_sql();
1347 if self.params.sql {
1348 return JsonValue::from(sql.clone());
1349 }
1350 let (state, mut data) = self.query(sql.as_str());
1351 match state {
1352 true => {
1353 for (field, _) in self.params.json.entries() {
1354 for item in data.members_mut() {
1355 if !item[field].is_empty() {
1356 let json = item[field].to_string();
1357 item[field] = match json::parse(&json) {
1358 Ok(e) => e,
1359 Err(_) => JsonValue::from(json),
1360 };
1361 }
1362 }
1363 }
1364 data.clone()
1365 }
1366 false => array![],
1367 }
1368 }
1369
1370 fn find(&mut self) -> JsonValue {
1371 self.params.page = 1;
1372 self.params.limit = 1;
1373 let sql = self.params.select_sql();
1374 if self.params.sql {
1375 return JsonValue::from(sql.clone());
1376 }
1377 let (state, mut data) = self.query(sql.as_str());
1378 match state {
1379 true => {
1380 if data.is_empty() {
1381 return object! {};
1382 }
1383 for (field, _) in self.params.json.entries() {
1384 if !data[0][field].is_empty() {
1385 let json = data[0][field].to_string();
1386 let json = json::parse(&json).unwrap_or(array![]);
1387 data[0][field] = json;
1388 } else {
1389 data[0][field] = array![];
1390 }
1391 }
1392 data[0].clone()
1393 }
1394 false => {
1395 object! {}
1396 }
1397 }
1398 }
1399
1400 fn value(&mut self, field: &str) -> JsonValue {
1401 self.params.fields = object! {};
1402 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1403 self.params.page = 1;
1404 self.params.limit = 1;
1405 let sql = self.params.select_sql();
1406 if self.params.sql {
1407 return JsonValue::from(sql.clone());
1408 }
1409 let (state, mut data) = self.query(sql.as_str());
1410 match state {
1411 true => {
1412 for (field, _) in self.params.json.entries() {
1413 if !data[0][field].is_empty() {
1414 let json = data[0][field].to_string();
1415 let json = json::parse(&json).unwrap_or(array![]);
1416 data[0][field] = json;
1417 } else {
1418 data[0][field] = array![];
1419 }
1420 }
1421 data[0][field].clone()
1422 }
1423 false => {
1424 if self.connection.debug {
1425 info!("{data:?}");
1426 }
1427 JsonValue::Null
1428 }
1429 }
1430 }
1431
1432 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1433 let fields_list = self.table_info(&self.params.table.clone());
1434 let mut fields = vec![];
1435 let mut values = vec![];
1436 if !self.params.autoinc && data["id"].is_empty() {
1437 let thread_id = format!("{:?}", std::thread::current().id());
1438 let thread_num: u64 = thread_id
1439 .trim_start_matches("ThreadId(")
1440 .trim_end_matches(")")
1441 .parse()
1442 .unwrap_or(0);
1443 data["id"] = format!(
1444 "{:X}{:X}",
1445 Local::now().timestamp_nanos_opt().unwrap_or(0),
1446 thread_num
1447 )
1448 .into();
1449 }
1450 for (field, value) in data.entries() {
1451 fields.push(format!("\"{}\"", field));
1452
1453 if value.is_string() {
1454 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1455 continue;
1456 } else if value.is_array() {
1457 if self.params.json[field].is_empty() {
1458 let array = value
1459 .members()
1460 .map(|x| x.as_str().unwrap_or(""))
1461 .collect::<Vec<&str>>()
1462 .join(",");
1463 values.push(format!("'{}'", array.replace("'", "''")));
1464 } else {
1465 let json = value.to_string();
1466 let json = json.replace("'", "''");
1467 values.push(format!("'{json}'"));
1468 }
1469 continue;
1470 } else if value.is_object() {
1471 if self.params.json[field].is_empty() {
1472 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1473 } else {
1474 let json = value.to_string();
1475 let json = json.replace("'", "''");
1476 values.push(format!("'{json}'"));
1477 }
1478 continue;
1479 } else if value.is_number() {
1480 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1481 if col_type == "boolean" {
1482 let bool_val = value.as_i64().unwrap_or(0) != 0;
1483 values.push(format!("{bool_val}"));
1484 } else if col_type.contains("int") {
1485 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1486 } else {
1487 values.push(format!("{value}"));
1488 }
1489 continue;
1490 } else if value.is_boolean() || value.is_null() {
1491 values.push(format!("{value}"));
1492 continue;
1493 } else {
1494 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1495 continue;
1496 }
1497 }
1498 let fields = fields.join(",");
1499 let values = values.join(",");
1500
1501 let sql = format!(
1502 "INSERT INTO {} ({}) VALUES ({});",
1503 self.params.table, fields, values
1504 );
1505 if self.params.sql {
1506 return JsonValue::from(sql.clone());
1507 }
1508 let (state, ids) = self.execute(sql.as_str());
1509
1510 match state {
1511 true => match self.params.autoinc {
1512 true => ids.clone(),
1513 false => data["id"].clone(),
1514 },
1515 false => {
1516 let thread_id = format!("{:?}", thread::current().id());
1517 error!("添加失败: {thread_id} {ids:?} {sql}");
1518 JsonValue::from("")
1519 }
1520 }
1521 }
1522 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1523 let fields_list = self.table_info(&self.params.table.clone());
1524 let mut fields = String::new();
1525 if !self.params.autoinc && data[0]["id"].is_empty() {
1526 data[0]["id"] = "".into();
1527 }
1528 for (field, _) in data[0].entries() {
1529 fields = format!("{fields},\"{field}\"");
1530 }
1531 fields = fields.trim_start_matches(",").to_string();
1532
1533 let core_count = num_cpus::get();
1534 let mut p = pools::Pool::new(core_count * 4);
1535
1536 let autoinc = self.params.autoinc;
1537 for list in data.members() {
1538 let mut item = list.clone();
1539 let i = br_fields::str::Code::verification_code(3);
1540 let fields_list_new = fields_list.clone();
1541 p.execute(move |pcindex| {
1542 if !autoinc && item["id"].is_empty() {
1543 let id = format!(
1544 "{:X}{:X}{}",
1545 Local::now().timestamp_nanos_opt().unwrap_or(0),
1546 pcindex,
1547 i
1548 );
1549 item["id"] = id.into();
1550 }
1551 let mut values = "".to_string();
1552 for (field, value) in item.entries() {
1553 if value.is_string() {
1554 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1555 } else if value.is_number() {
1556 let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1557 if col_type == "boolean" {
1558 let bool_val = value.as_i64().unwrap_or(0) != 0;
1559 values = format!("{values},{bool_val}");
1560 } else if col_type.contains("int") {
1561 values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1562 } else {
1563 values = format!("{values},{value}");
1564 }
1565 } else if value.is_boolean() {
1566 values = format!("{values},{value}");
1567 continue;
1568 } else {
1569 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1570 }
1571 }
1572 values = format!("({})", values.trim_start_matches(","));
1573 array![item["id"].clone(), values]
1574 });
1575 }
1576 let (ids_list, mut values) = p.insert_all();
1577 values = values.trim_start_matches(",").to_string();
1578 let sql = format!(
1579 "INSERT INTO {} ({}) VALUES {};",
1580 self.params.table, fields, values
1581 );
1582
1583 if self.params.sql {
1584 return JsonValue::from(sql.clone());
1585 }
1586 let (state, data) = self.execute(sql.as_str());
1587 match state {
1588 true => match autoinc {
1589 true => data,
1590 false => JsonValue::from(ids_list),
1591 },
1592 false => {
1593 error!("insert_all: {data:?}");
1594
1595 array![]
1596 }
1597 }
1598 }
1599 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1600 let fields_list = self.table_info(&self.params.table.clone());
1601 let mut fields = vec![];
1602 let mut values = vec![];
1603 if !self.params.autoinc && data["id"].is_empty() {
1604 let thread_id = format!("{:?}", std::thread::current().id());
1605 let thread_num: u64 = thread_id
1606 .trim_start_matches("ThreadId(")
1607 .trim_end_matches(")")
1608 .parse()
1609 .unwrap_or(0);
1610 data["id"] = format!(
1611 "{:X}{:X}",
1612 Local::now().timestamp_nanos_opt().unwrap_or(0),
1613 thread_num
1614 )
1615 .into();
1616 }
1617 for (field, value) in data.entries() {
1618 fields.push(format!("\"{}\"", field));
1619
1620 if value.is_string() {
1621 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1622 continue;
1623 } else if value.is_array() {
1624 if self.params.json[field].is_empty() {
1625 let array = value
1626 .members()
1627 .map(|x| x.as_str().unwrap_or(""))
1628 .collect::<Vec<&str>>()
1629 .join(",");
1630 values.push(format!("'{}'", array.replace("'", "''")));
1631 } else {
1632 let json = value.to_string();
1633 let json = json.replace("'", "''");
1634 values.push(format!("'{json}'"));
1635 }
1636 continue;
1637 } else if value.is_object() {
1638 if self.params.json[field].is_empty() {
1639 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1640 } else {
1641 let json = value.to_string();
1642 let json = json.replace("'", "''");
1643 values.push(format!("'{json}'"));
1644 }
1645 continue;
1646 } else if value.is_number() {
1647 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1648 if col_type == "boolean" {
1649 let bool_val = value.as_i64().unwrap_or(0) != 0;
1650 values.push(format!("{bool_val}"));
1651 } else if col_type.contains("int") {
1652 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1653 } else {
1654 values.push(format!("{value}"));
1655 }
1656 continue;
1657 } else if value.is_boolean() || value.is_null() {
1658 values.push(format!("{value}"));
1659 continue;
1660 } else {
1661 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1662 continue;
1663 }
1664 }
1665
1666 let conflict_cols: Vec<String> = conflict_fields
1667 .iter()
1668 .map(|f| format!("\"{}\"", f))
1669 .collect();
1670
1671 let update_set: Vec<String> = fields
1672 .iter()
1673 .filter(|f| {
1674 let name = f.trim_matches('"');
1675 !conflict_fields.contains(&name) && name != "id"
1676 })
1677 .map(|f| format!("{f}=EXCLUDED.{f}"))
1678 .collect();
1679
1680 let fields_str = fields.join(",");
1681 let values_str = values.join(",");
1682
1683 let sql = format!(
1684 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1685 self.params.table,
1686 fields_str,
1687 values_str,
1688 conflict_cols.join(","),
1689 update_set.join(",")
1690 );
1691 if self.params.sql {
1692 return JsonValue::from(sql.clone());
1693 }
1694 let (state, result) = self.execute(sql.as_str());
1695 match state {
1696 true => match self.params.autoinc {
1697 true => result.clone(),
1698 false => data["id"].clone(),
1699 },
1700 false => {
1701 let thread_id = format!("{:?}", thread::current().id());
1702 error!("upsert失败: {thread_id} {result:?} {sql}");
1703 JsonValue::from("")
1704 }
1705 }
1706 }
1707 fn update(&mut self, data: JsonValue) -> JsonValue {
1708 let fields_list = self.table_info(&self.params.table.clone());
1709 let mut values = vec![];
1710 for (field, value) in data.entries() {
1711 if value.is_string() {
1712 values.push(format!(
1713 "\"{}\"='{}'",
1714 field,
1715 value.to_string().replace("'", "''")
1716 ));
1717 } else if value.is_number() {
1718 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1719 if col_type == "boolean" {
1720 let bool_val = value.as_i64().unwrap_or(0) != 0;
1721 values.push(format!("\"{field}\"= {bool_val}"));
1722 } else if col_type.contains("int") {
1723 values.push(format!(
1724 "\"{}\"= {}",
1725 field,
1726 value.as_f64().unwrap_or(0.0) as i64
1727 ));
1728 } else {
1729 values.push(format!("\"{field}\"= {value}"));
1730 }
1731 } else if value.is_array() {
1732 if self.params.json[field].is_empty() {
1733 let array = value
1734 .members()
1735 .map(|x| x.as_str().unwrap_or(""))
1736 .collect::<Vec<&str>>()
1737 .join(",");
1738 values.push(format!("\"{}\"='{}'", field, array.replace("'", "''")));
1739 } else {
1740 let json = value.to_string();
1741 let json = json.replace("'", "''");
1742 values.push(format!("\"{field}\"='{json}'"));
1743 }
1744 continue;
1745 } else if value.is_object() {
1746 if self.params.json[field].is_empty() {
1747 values.push(format!("\"{}\"='{}'", field, value.to_string().replace("'", "''")));
1748 } else {
1749 if value.is_empty() {
1750 values.push(format!("\"{field}\"=''"));
1751 continue;
1752 }
1753 let json = value.to_string();
1754 let json = json.replace("'", "''");
1755 values.push(format!("\"{field}\"='{json}'"));
1756 }
1757 continue;
1758 } else if value.is_boolean() || value.is_null() {
1759 values.push(format!("\"{field}\"= {value}"));
1760 } else {
1761 values.push(format!("\"{field}\"=\"{value}\""));
1762 }
1763 }
1764
1765 for (field, value) in self.params.inc_dec.entries() {
1766 values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1767 }
1768 if !self.params.update_column.is_empty() {
1769 values.extend(self.params.update_column.clone());
1770 }
1771 let values = values.join(",");
1772
1773 let sql = format!(
1774 "UPDATE {} SET {} {};",
1775 self.params.table.clone(),
1776 values,
1777 self.params.where_sql()
1778 );
1779 if self.params.sql {
1780 return JsonValue::from(sql.clone());
1781 }
1782 let (state, data) = self.execute(sql.as_str());
1783 if state {
1784 data
1785 } else {
1786 let thread_id = format!("{:?}", thread::current().id());
1787 error!("update: {thread_id} {data:?} {sql}");
1788 0.into()
1789 }
1790 }
1791 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1792 let fields_list = self.table_info(&self.params.table.clone());
1793 let mut values = vec![];
1794
1795 let mut ids = vec![];
1796 for (field, _) in data[0].entries() {
1797 if field == "id" {
1798 continue;
1799 }
1800 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1801 let mut fields = vec![];
1802 for row in data.members() {
1803 let value = row[field].clone();
1804 let id = row["id"].clone();
1805 ids.push(id.clone());
1806 if value.is_string() {
1807 fields.push(format!(
1808 "WHEN '{}' THEN '{}'",
1809 id,
1810 value.to_string().replace("'", "''")
1811 ));
1812 } else if value.is_array() || value.is_object() {
1813 if self.params.json[field].is_empty() {
1814 fields.push(format!("WHEN '{}' THEN '{}'", id, value.to_string().replace("'", "''")));
1815 } else {
1816 let json = value.to_string();
1817 let json = json.replace("'", "''");
1818 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1819 }
1820 continue;
1821 } else if value.is_number() {
1822 if col_type == "boolean" {
1823 let bool_val = value.as_i64().unwrap_or(0) != 0;
1824 fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1825 } else {
1826 fields.push(format!("WHEN '{id}' THEN {value}"));
1827 }
1828 } else if value.is_boolean() || value.is_null() {
1829 fields.push(format!("WHEN '{id}' THEN {value}"));
1830 } else {
1831 fields.push(format!("WHEN '{}' THEN '{}'", id, value.to_string().replace("'", "''")));
1832 }
1833 }
1834 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1835 }
1836 self.where_and("id", "in", ids.into());
1837 for (field, value) in self.params.inc_dec.entries() {
1838 values.push(format!("{} = {}", field, value.to_string().clone()));
1839 }
1840
1841 let values = values.join(",");
1842 let sql = format!(
1843 "UPDATE {} SET {} {} {};",
1844 self.params.table.clone(),
1845 values,
1846 self.params.where_sql(),
1847 self.params.page_limit_sql()
1848 );
1849 if self.params.sql {
1850 return JsonValue::from(sql.clone());
1851 }
1852 let (state, data) = self.execute(sql.as_str());
1853 if state {
1854 data
1855 } else {
1856 error!("update_all: {data:?}");
1857 JsonValue::from(0)
1858 }
1859 }
1860
1861 fn delete(&mut self) -> JsonValue {
1862 let sql = format!(
1863 "delete FROM {} {} {};",
1864 self.params.table.clone(),
1865 self.params.where_sql(),
1866 self.params.page_limit_sql()
1867 );
1868 if self.params.sql {
1869 return JsonValue::from(sql.clone());
1870 }
1871 let (state, data) = self.execute(sql.as_str());
1872 match state {
1873 true => data,
1874 false => {
1875 error!("delete 失败>>> {data:?}");
1876 JsonValue::from(0)
1877 }
1878 }
1879 }
1880
1881 fn transaction(&mut self) -> bool {
1882 let thread_id = format!("{:?}", thread::current().id());
1883 let key = format!("{}{}", self.default, thread_id);
1884
1885 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1886 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1887 PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1888 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1889 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1890 return true;
1891 }
1892
1893 let mut conn = None;
1895 for attempt in 0..3u8 {
1896 match self.client.get_connect_for_transaction() {
1897 Ok(mut c) => {
1898 if c.is_valid() {
1899 conn = Some(c);
1900 break;
1901 }
1902 warn!("事务连接无效(第{}次)", attempt + 1);
1903 self.client.release_transaction_conn();
1904 }
1905 Err(e) => {
1906 warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1907 }
1908 }
1909 if attempt < 2 {
1910 thread::sleep(std::time::Duration::from_millis(200));
1911 }
1912 }
1913 let mut conn = match conn {
1914 Some(c) => c,
1915 None => {
1916 error!("获取事务连接重试耗尽");
1917 return false;
1918 }
1919 };
1920
1921 if let Err(e) = conn.execute("START TRANSACTION") {
1922 error!("启动事务失败: {e}");
1923 self.client.release_transaction_conn();
1924 return false;
1925 }
1926
1927
1928 PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1929 true
1930 }
1931 fn commit(&mut self) -> bool {
1932 let thread_id = format!("{:?}", thread::current().id());
1933 let key = format!("{}{}", self.default, thread_id);
1934
1935 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1936 error!("commit: 没有活跃的事务");
1937 return false;
1938 }
1939
1940 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1941 if depth > 1 {
1942 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1943 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1944 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1945 return true;
1946 }
1947
1948 let commit_result =
1949 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1950
1951 let success = match commit_result {
1952 Some(Ok(_)) => true,
1953 Some(Err(e)) => {
1954 error!("提交事务失败: {e}");
1955 false
1956 }
1957 None => {
1958 error!("提交事务失败: 未找到连接");
1959 false
1960 }
1961 };
1962
1963 if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id) {
1964 self.client.release_transaction_conn_with_conn(conn);
1965 } else {
1966 self.client.release_transaction_conn();
1967 }
1968 success
1969 }
1970
1971 fn rollback(&mut self) -> bool {
1972 let thread_id = format!("{:?}", thread::current().id());
1973 let key = format!("{}{}", self.default, thread_id);
1974
1975 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1976 error!("rollback: 没有活跃的事务");
1977 return false;
1978 }
1979
1980 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1981 if depth > 1 {
1982 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1983 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1984 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1985 return true;
1986 }
1987
1988 let rollback_result =
1989 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1990
1991 let success = match rollback_result {
1992 Some(Ok(_)) => true,
1993 Some(Err(e)) => {
1994 error!("回滚失败: {e}");
1995 false
1996 }
1997 None => {
1998 error!("回滚失败: 未找到连接");
1999 false
2000 }
2001 };
2002
2003 if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id) {
2004 self.client.release_transaction_conn_with_conn(conn);
2005 } else {
2006 self.client.release_transaction_conn();
2007 }
2008 success
2009 }
2010
2011 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
2012 let (state, data) = self.query(sql);
2013 match state {
2014 true => Ok(data),
2015 false => Err(data.to_string()),
2016 }
2017 }
2018
2019 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2020 let (state, data) = self.execute(sql);
2021 match state {
2022 true => Ok(data),
2023 false => Err(data.to_string()),
2024 }
2025 }
2026
2027 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2028 self.params.inc_dec[field] = format!("{field} + {num}").into();
2029 self
2030 }
2031
2032 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2033 self.params.inc_dec[field] = format!("{field} - {num}").into();
2034 self
2035 }
2036 fn buildsql(&mut self) -> String {
2037 self.fetch_sql();
2038 let sql = self.select().to_string();
2039 format!("( {} ) {}", sql, self.params.table)
2040 }
2041
2042 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2043 for field in fields {
2044 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2045 }
2046 self
2047 }
2048
2049 fn join(
2050 &mut self,
2051 main_table: &str,
2052 main_fields: &str,
2053 right_table: &str,
2054 right_fields: &str,
2055 ) -> &mut Self {
2056 let main_table = if main_table.is_empty() {
2057 self.params.table.clone()
2058 } else {
2059 main_table.to_string()
2060 };
2061 self.params.join_table = right_table.to_string();
2062 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2063 self
2064 }
2065
2066 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2067 let main_fields = if main_fields.is_empty() {
2068 "id"
2069 } else {
2070 main_fields
2071 };
2072 let second_fields = if second_fields.is_empty() {
2073 self.params.table.clone()
2074 } else {
2075 second_fields.to_string().clone()
2076 };
2077 let sec_table_name = format!("{}{}", table, "_2");
2078 let second_table = format!("{} {}", table, sec_table_name.clone());
2079 self.params.join_table = sec_table_name.clone();
2080 self.params.join.push(format!(
2081 " INNER JOIN {} ON {}.{} = {}.{}",
2082 second_table, self.params.table, main_fields, sec_table_name, second_fields
2083 ));
2084 self
2085 }
2086
2087 fn join_right(
2088 &mut self,
2089 main_table: &str,
2090 main_fields: &str,
2091 right_table: &str,
2092 right_fields: &str,
2093 ) -> &mut Self {
2094 let main_table = if main_table.is_empty() {
2095 self.params.table.clone()
2096 } else {
2097 main_table.to_string()
2098 };
2099 self.params.join_table = right_table.to_string();
2100 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2101 self
2102 }
2103
2104 fn join_full(
2105 &mut self,
2106 main_table: &str,
2107 main_fields: &str,
2108 right_table: &str,
2109 right_fields: &str,
2110 ) -> &mut Self {
2111 let main_table = if main_table.is_empty() {
2112 self.params.table.clone()
2113 } else {
2114 main_table.to_string()
2115 };
2116 self.params.join_table = right_table.to_string();
2117 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2118 self
2119 }
2120
2121 fn union(&mut self, sub_sql: &str) -> &mut Self {
2122 self.params.unions.push(format!("UNION {sub_sql}"));
2123 self
2124 }
2125
2126 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2127 self.params.unions.push(format!("UNION ALL {sub_sql}"));
2128 self
2129 }
2130
2131 fn lock_for_update(&mut self) -> &mut Self {
2132 self.params.lock_mode = "FOR UPDATE".to_string();
2133 self
2134 }
2135
2136 fn lock_for_share(&mut self) -> &mut Self {
2137 self.params.lock_mode = "FOR SHARE".to_string();
2138 self
2139 }
2140}