1use crate::config::Connection;
2use crate::TABLE_FIELDS;
3use crate::pools;
4use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
5use crate::types::{DbMode, Mode, Params, TableOptions};
6use br_pgsql::pools::Pools;
7use br_pgsql::PgsqlError;
8use chrono::Local;
9use json::{array, object, JsonValue};
10use log::{error, info, warn};
11use std::thread;
12#[derive(Clone)]
13pub struct Pgsql {
14 pub connection: Connection,
16 pub default: String,
18 pub params: Params,
19 pub client: Pools,
20}
21
22impl Pgsql {
23 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
24 let port = connection
25 .hostport
26 .parse::<i32>()
27 .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
28
29 let cp_connection = connection.clone();
30 let config = object! {
31 debug: cp_connection.debug,
32 username: cp_connection.username,
33 userpass: cp_connection.userpass,
34 database: cp_connection.database,
35 hostname: cp_connection.hostname,
36 hostport: port,
37 charset: cp_connection.charset.str(),
38 pool_max: cp_connection.pool.max_connections,
39 };
40 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
41
42 let pools = pgsql.pools()?;
43 Ok(Self {
44 connection,
45 default: default.clone(),
46 params: Params::default("pgsql"),
47 client: pools,
48 })
49 }
50
51 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
52 let thread_id = format!("{:?}", thread::current().id());
53 let key = format!("{}{}", self.default, thread_id);
54
55 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
56 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
57
58 match result {
59 Some(Ok(e)) => {
60 if self.connection.debug {
61 info!("查询成功: {} {}", thread_id.clone(), sql);
62 }
63 (true, e.rows)
64 }
65 Some(Err(e)) => {
66 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
67 (false, JsonValue::from(e.to_string()))
68 }
69 None => {
70 error!("事务查询失败: 未找到事务连接 {thread_id}");
71 (false, JsonValue::from("未找到事务连接"))
72 }
73 }
74 } else {
75 let mut guard = match self.client.get_guard() {
76 Ok(g) => g,
77 Err(e) => {
78 error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
80 return (false, JsonValue::from(e.to_string()));
81 }
82 };
83 match guard.conn().query(sql) {
84 Ok(e) => {
85 if self.connection.debug {
86 info!("查询成功: {} {}", thread_id.clone(), sql);
87 }
88 (true, e.rows)
89 }
90 Err(ref e) if Self::is_retriable_error(e) => {
91 guard.discard();
94 self.client.flush_idle();
96 warn!("非事务查询连接断开(重试一次): {thread_id} {e}");
97 thread::sleep(std::time::Duration::from_millis(200));
98 let mut guard2 = match self.client.get_guard() {
99 Ok(g) => g,
100 Err(e) => {
101 error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
102 return (false, JsonValue::from(e.to_string()));
103 }
104 };
105 match guard2.conn().query(sql) {
106 Ok(e) => {
107 if self.connection.debug {
108 info!("查询成功(重试): {} {}", thread_id.clone(), sql);
109 }
110 (true, e.rows)
111 }
112 Err(e) => {
113 error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
114 (false, JsonValue::from(e.to_string()))
115 }
116 }
117 }
118 Err(e) => {
119 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
120 (false, JsonValue::from(e.to_string()))
121 }
122 }
123 }
124 }
125 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
126 let thread_id = format!("{:?}", thread::current().id());
127 let key = format!("{}{}", self.default, thread_id);
128
129 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
130 if !PGSQL_TRANSACTION_MANAGER.acquire_table_lock(
132 &self.params.table,
133 &thread_id,
134 std::time::Duration::from_secs(30),
135 ) {
136 error!("获取表锁超时: {} {}", self.params.table, thread_id);
137 return (false, JsonValue::from("table lock timeout"));
138 }
139 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
140
141 match result {
142 Some(Ok(e)) => {
143 if self.connection.debug {
144 info!("提交成功: {} {}", thread_id.clone(), sql);
145 }
146 if sql.contains("INSERT") {
147 (true, e.rows)
148 } else {
149 (true, e.affect_count.into())
150 }
151 }
152 Some(Err(e)) => {
153 error!("事务提交失败: {thread_id} {e}");
154 (false, JsonValue::from(e.to_string()))
155 }
156 None => {
157 error!("事务执行失败: 未找到事务连接 {thread_id}");
158 (false, JsonValue::from("未找到事务连接"))
159 }
160 }
161 } else {
162 let mut guard = match self.client.get_guard() {
164 Ok(g) => g,
165 Err(e) => {
166 error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
167 return (false, JsonValue::from(e.to_string()));
168 }
169 };
170 match guard.conn().execute(sql) {
171 Ok(e) => {
172 if self.connection.debug {
173 info!("提交成功: {} {}", thread_id.clone(), sql);
174 }
175 if sql.contains("INSERT") {
176 (true, e.rows)
177 } else {
178 (true, e.affect_count.into())
179 }
180 }
181 Err(ref e) if Self::is_retriable_error(e) => {
182 guard.discard();
185 self.client.flush_idle();
187 warn!("非事务执行连接断开(重试一次): {thread_id} {e}");
188 thread::sleep(std::time::Duration::from_millis(200));
189 let mut guard2 = match self.client.get_guard() {
190 Ok(g) => g,
191 Err(e) => {
192 error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
193 return (false, JsonValue::from(e.to_string()));
194 }
195 };
196 match guard2.conn().execute(sql) {
197 Ok(e) => {
198 if self.connection.debug {
199 info!("提交成功(重试): {} {}", thread_id.clone(), sql);
200 }
201 if sql.contains("INSERT") {
202 (true, e.rows)
203 } else {
204 (true, e.affect_count.into())
205 }
206 }
207 Err(e) => {
208 error!("非事务执行重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
209 (false, JsonValue::from(e.to_string()))
210 }
211 }
212 }
213 Err(e) => {
214 error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
215 (false, JsonValue::from(e.to_string()))
216 }
217 }
218 }
219 }
220
221 fn is_retriable_error(e: &PgsqlError) -> bool {
223 matches!(
224 e,
225 PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
226 )
227 }
228}
229
230impl DbMode for Pgsql {
231 fn database_tables(&mut self) -> JsonValue {
232 let sql = "SHOW TABLES".to_string();
233 match self.sql(sql.as_str()) {
234 Ok(e) => {
235 let mut list = vec![];
236 for item in e.members() {
237 for (_, value) in item.entries() {
238 list.push(value.clone());
239 }
240 }
241 list.into()
242 }
243 Err(_) => {
244 array![]
245 }
246 }
247 }
248
249 fn database_create(&mut self, name: &str) -> bool {
250 let sql = format!("CREATE DATABASE {name}");
251
252 let (state, data) = self.execute(sql.as_str());
253 match state {
254 true => data.as_bool().unwrap_or(true),
255 false => {
256 error!("创建数据库失败: {data:?}");
257 false
258 }
259 }
260 }
261
262 fn truncate(&mut self, table: &str) -> bool {
263 let sql = format!("TRUNCATE TABLE {table}");
264 let (state, _) = self.execute(sql.as_str());
265 state
266 }
267}
268
269impl Mode for Pgsql {
270 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
271 let mut sql = String::new();
272 let mut comments = vec![];
273
274 if !options.table_unique.is_empty() {
275 let full_name = format!(
276 "{}_unique_{}",
277 options.table_name,
278 options.table_unique.join("_")
279 );
280 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
281 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
282 let unique = format!(
283 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
284 name,
285 options.table_name,
286 options.table_unique.join(",")
287 );
288 comments.push(unique);
289 }
290
291 for row in options.table_index.iter() {
292 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
293 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
294 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
295 let index = format!(
296 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
297 name,
298 options.table_name,
299 row.join(",")
300 );
301 comments.push(index);
302 }
303
304 for (name, field) in options.table_fields.entries_mut() {
305 field["table_name"] = options.table_name.clone().into();
306 let row = br_fields::field("pgsql", name, field.clone());
307 let (col_sql, meta) = if let Some(idx) = row.find("--") {
308 (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
309 } else {
310 (row.trim(), None)
311 };
312 if let Some(meta) = meta {
313 comments.push(format!(
314 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
315 options.table_name, name, meta
316 ));
317 }
318 sql = format!("{} {},\r\n", sql, col_sql);
319 }
320
321 let primary_key = format!(
322 "CONSTRAINT {}_{} PRIMARY KEY ({})",
323 options.table_name, options.table_key, options.table_key
324 );
325 let sql = format!(
326 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
327 options.table_name,
328 sql.trim_end_matches(",\r\n"),
329 primary_key
330 );
331 comments.insert(0, sql);
332
333 for (_name, field) in options.table_fields.entries() {
334 let _ = field["mode"].as_str();
335 }
336
337 if self.params.sql {
338 let info = comments.join("\r\n");
339 return JsonValue::from(info);
340 }
341 for comment in comments {
342 let (state, _) = self.execute(comment.as_str());
343 match state {
344 true => {}
345 false => {
346 return JsonValue::from(state);
347 }
348 }
349 }
350 JsonValue::from(true)
351 }
352
353 fn table_update(&mut self, options: TableOptions) -> JsonValue {
354 let cache_key = format!("{}{}", self.default, options.table_name);
356 let table_fields_guard = match TABLE_FIELDS.read() {
357 Ok(g) => g,
358 Err(e) => e.into_inner(),
359 };
360 if table_fields_guard.get(&cache_key).is_some() {
361 drop(table_fields_guard);
362 let mut table_fields_guard = match TABLE_FIELDS.write() {
363 Ok(g) => g,
364 Err(e) => e.into_inner(),
365 };
366 table_fields_guard.remove(&cache_key);
367 } else {
368 drop(table_fields_guard);
369 }
370 let fields_list = self.table_info(&options.table_name);
371 let mut put = vec![];
372 let mut add = vec![];
373 let mut del = vec![];
374 let mut comments = vec![];
375
376 for (key, _) in fields_list.entries() {
377 if options.table_fields[key].is_empty() {
378 del.push(key);
379 }
380 }
381 for (name, field) in options.table_fields.entries() {
382 if !fields_list[name].is_empty() {
383 let old_info = &fields_list[name];
384 let new_field_sql = br_fields::field("pgsql", name, field.clone());
385
386 let old_comment = old_info["comment"].as_str().unwrap_or("");
387 let old_type = old_info["type"].as_str().unwrap_or("");
388
389 let new_comment = if let Some(idx) = new_field_sql.find("--") {
390 new_field_sql[idx + 2..].trim()
391 } else {
392 ""
393 };
394
395 let comment_matches =
396 if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
397 let old_parts: Vec<&str> = old_comment.split('|').collect();
398 let new_parts: Vec<&str> = new_comment.split('|').collect();
399 if old_parts.len() >= 4 && new_parts.len() >= 4 {
400 old_parts[..4] == new_parts[..4]
401 } else {
402 old_comment == new_comment
403 }
404 } else if !old_comment.is_empty() && !new_comment.is_empty() {
405 let old_parts: Vec<&str> = old_comment.split('|').collect();
406 let new_parts: Vec<&str> = new_comment.split('|').collect();
407 if old_parts.len() >= 2
408 && new_parts.len() >= 2
409 && old_parts.len() == new_parts.len()
410 {
411 let old_filtered: Vec<&str> = old_parts
412 .iter()
413 .filter(|v| **v != "true" && **v != "false")
414 .copied()
415 .collect();
416 let new_filtered: Vec<&str> = new_parts
417 .iter()
418 .filter(|v| **v != "true" && **v != "false")
419 .copied()
420 .collect();
421 old_filtered == new_filtered
422 } else {
423 old_comment == new_comment
424 }
425 } else {
426 old_comment == new_comment
427 };
428
429 let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
430 let new_type = if sql_parts.len() > 1 {
431 sql_parts[1].to_lowercase()
432 } else {
433 String::new()
434 };
435
436 let type_matches = match old_type {
437 "integer" => {
438 new_type.contains("int")
439 && !new_type.contains("bigint")
440 && !new_type.contains("smallint")
441 }
442 "bigint" => new_type.contains("bigint"),
443 "smallint" => new_type.contains("smallint"),
444 "boolean" => new_type.contains("boolean"),
445 "text" => new_type.contains("text"),
446 "character varying" => {
447 if !new_type.contains("varchar") {
448 false
449 } else {
450 let old_len = old_info["max_length"].as_i64().unwrap_or(0);
451 let new_len = new_type
452 .trim_start_matches("varchar(")
453 .trim_end_matches(')')
454 .parse::<i64>()
455 .unwrap_or(0);
456 let matched = old_len == new_len || new_len == 0;
457 if !matched {
458 log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
459 }
460 old_len == new_len || new_len == 0
461 }
462 }
463 "character" => new_type.contains("char") && !new_type.contains("varchar"),
464 "numeric" => {
465 if !(new_type.contains("numeric") || new_type.contains("decimal")) {
466 false
467 } else {
468 let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
469 let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
470 let inner = new_type
471 .replace("numeric(", "")
472 .replace("decimal(", "")
473 .replace(')', "");
474 let parts: Vec<&str> = inner.split(',').collect();
475 let new_prec = parts.first().and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
476 let new_scale = parts.get(1).and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
477 old_prec == new_prec && old_scale == new_scale
478 }
479 }
480 "double precision" => {
481 new_type.contains("double") || new_type.contains("float8")
482 }
483 "real" => new_type.contains("real") || new_type.contains("float4"),
484 "timestamp without time zone" | "timestamp with time zone" => {
485 new_type.contains("timestamp")
486 }
487 "date" => new_type.contains("date") && !new_type.contains("timestamp"),
488 "time without time zone" | "time with time zone" => {
489 new_type.contains("time") && !new_type.contains("timestamp")
490 }
491 "json" | "jsonb" => new_type.contains("json"),
492 "uuid" => new_type.contains("uuid"),
493 "bytea" => new_type.contains("bytea"),
494 _ => old_type == new_type,
495 };
496
497 if type_matches && comment_matches {
498 continue;
499 }
500
501 log::debug!(
502 "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
503 options.table_name,
504 name,
505 type_matches,
506 old_type,
507 new_type,
508 comment_matches
509 );
510 put.push(name);
511 } else {
512 add.push(name);
513 }
514 }
515
516 for name in add.iter() {
517 let name = name.to_string();
518 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
519 let rows = row.split("--").collect::<Vec<&str>>();
520 comments.push(format!(
521 r#"ALTER TABLE "{}" add {};"#,
522 options.table_name,
523 rows[0].trim()
524 ));
525 if rows.len() > 1 {
526 comments.push(format!(
527 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
528 options.table_name,
529 name,
530 rows[1].trim()
531 ));
532 }
533 }
534 for name in del.iter() {
535 comments.push(format!(
536 "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
537 options.table_name, name
538 ));
539 }
540 for name in put.iter() {
541 let name = name.to_string();
542 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
543 let rows = row.split("--").collect::<Vec<&str>>();
544
545 let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
546
547 if sql[1].contains("BOOLEAN") {
548 let text = format!(
549 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
550 options.table_name, name
551 );
552 comments.push(text.clone());
553 let text = format!(
554 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
555 options.table_name, name, sql[1]
556 );
557 comments.push(text.clone());
558 } else {
559 let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
560 let new_type_lower = sql[1].to_lowercase();
561 let is_date_to_numeric = (old_col_type == "date"
562 || old_col_type.contains("timestamp"))
563 && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
564 if is_date_to_numeric {
565 comments.push(format!(
566 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
567 options.table_name, name
568 ));
569 comments.push(format!(
570 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
571 options.table_name, name, sql[1], name, name, name
572 ));
573 } else {
574 let text = format!(
575 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
576 options.table_name, name, sql[1]
577 );
578 comments.push(text.clone());
579 }
580 };
581
582 if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
583 let default_value = rows[0][default_pos + 9..].trim();
584 if !default_value.is_empty() {
585 comments.push(format!(
586 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
587 options.table_name, name, default_value
588 ));
589 }
590 }
591 let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
594 .as_str()
595 .unwrap_or("YES");
596 let old_is_required = old_is_nullable == "NO";
597
598 if old_is_required && name != options.table_key {
600 comments.push(format!(
601 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
602 options.table_name, name
603 ));
604 }
605
606 if rows.len() > 1 {
607 comments.push(format!(
608 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
609 options.table_name,
610 name,
611 rows[1].trim()
612 ));
613 }
614 }
615
616 let mut unique_new = vec![];
617 let mut index_new = vec![];
618 let mut primary_key = vec![];
619 let (_, index_list) = self.query(
620 format!(
621 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
622 options.table_name
623 )
624 .as_str(),
625 );
626 for item in index_list.members() {
627 let key_name = item["indexname"].as_str().unwrap_or("");
628 let indexdef = item["indexdef"].to_string();
629
630 if indexdef.contains(
631 format!(
632 "CREATE UNIQUE INDEX {}_{} ON",
633 options.table_name, options.table_key
634 )
635 .as_str(),
636 ) {
637 primary_key.push(key_name.to_string());
638 continue;
639 }
640 if indexdef.contains("CREATE UNIQUE INDEX") {
641 unique_new.push(key_name.to_string());
642 continue;
643 }
644 if indexdef.contains("CREATE INDEX") {
645 index_new.push(key_name.to_string());
646 continue;
647 }
648 }
649
650 if !options.table_unique.is_empty() {
651 let full_name = format!(
652 "{}_unique_{}",
653 options.table_name,
654 options.table_unique.join("_")
655 );
656 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
657 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
658 let unique = format!(
659 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
660 name,
661 options.table_name,
662 options.table_unique.join(",")
663 );
664 if !unique_new.contains(&name) {
665 comments.push(unique);
666 }
667 unique_new.retain(|x| *x != name);
668 }
669
670 for row in options.table_index.iter() {
671 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
672 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
673 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
674 let index = format!(
675 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
676 name,
677 options.table_name,
678 row.join(",")
679 );
680 if !index_new.contains(&name) {
681 comments.push(index);
682 }
683 index_new.retain(|x| *x != name);
684 }
685
686 for item in unique_new {
687 if item.ends_with("_pkey") {
688 continue;
689 }
690 if item.starts_with("unique_") {
691 comments.push(format!(
692 "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
693 options.table_name,
694 item.clone()
695 ));
696 } else {
697 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
698 }
699 }
700 for item in index_new {
701 if item.ends_with("_pkey") {
702 continue;
703 }
704 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
705 }
706
707 if self.params.sql {
708 return JsonValue::from(comments.join(""));
709 }
710
711 if comments.is_empty() {
712 return JsonValue::from(-1);
713 }
714
715 for item in comments.iter() {
716 let (state, res) = self.execute(item.as_str());
717 match state {
718 true => {}
719 false => {
720 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
721 return JsonValue::from(0);
722 }
723 }
724 }
725 JsonValue::from(1)
726 }
727
728 fn table_info(&mut self, table: &str) -> JsonValue {
729 let cache_key = format!("{}{}", self.default, table);
731 let table_fields_guard = match TABLE_FIELDS.read() {
732 Ok(g) => g,
733 Err(e) => e.into_inner(),
734 };
735 if let Some(cached) = table_fields_guard.get(&cache_key) {
736 return cached.clone();
737 }
738 drop(table_fields_guard);
739 let sql = format!(
740 "SELECT COL.COLUMN_NAME,
741 COL.DATA_TYPE,
742 COL.IS_NULLABLE,
743 COL.CHARACTER_MAXIMUM_LENGTH,
744 COL.NUMERIC_PRECISION,
745 COL.NUMERIC_SCALE,
746 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
747 LEFT JOIN
748 pg_catalog.pg_description DESCRIPTION
749 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
750 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
751 let (state, data) = self.query(sql.as_str());
752 let mut list = object! {};
753 if state {
754 for item in data.members() {
755 let mut row = object! {};
756 row["field"] = item["column_name"].clone();
757 row["comment"] = item["comment"].clone();
758 row["type"] = item["data_type"].clone();
759 row["is_nullable"] = item["is_nullable"].clone();
760 row["max_length"] = item["character_maximum_length"].clone();
761 row["numeric_precision"] = item["numeric_precision"].clone();
762 row["numeric_scale"] = item["numeric_scale"].clone();
763 if let Some(field_name) = row["field"].as_str() {
764 list[field_name] = row.clone();
765 }
766 }
767 let mut table_fields_guard = match TABLE_FIELDS.write() {
769 Ok(g) => g,
770 Err(e) => e.into_inner(),
771 };
772 table_fields_guard.insert(cache_key, list.clone());
773 list
774 } else {
775 list
776 }
777 }
778
779 fn table_is_exist(&mut self, name: &str) -> bool {
780 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
781 let (state, data) = self.query(sql.as_str());
782 match state {
783 true => {
784 for item in data.members() {
785 if item.has_key("exists") {
786 return item["exists"].as_bool().unwrap_or(false);
787 }
788 }
789 false
790 }
791 false => false,
792 }
793 }
794
795 fn table(&mut self, name: &str) -> &mut Pgsql {
796 self.params = Params::default(self.connection.mode.str().as_str());
797 let table_name = format!("{}{}", self.connection.prefix, name);
798 if !super::sql_safety::validate_table_name(&table_name) {
799 error!("Invalid table name: {}", name);
800 }
801 self.params.table = table_name.clone();
802 self.params.join_table = table_name;
803 self
804 }
805
806 fn change_table(&mut self, name: &str) -> &mut Self {
807 self.params.join_table = name.to_string();
808 self
809 }
810
811 fn autoinc(&mut self) -> &mut Self {
812 self.params.autoinc = true;
813 self
814 }
815
816 fn timestamps(&mut self) -> &mut Self {
817 self.params.timestamps = true;
818 self
819 }
820
821 fn fetch_sql(&mut self) -> &mut Self {
822 self.params.sql = true;
823 self
824 }
825
826 fn order(&mut self, field: &str, by: bool) -> &mut Self {
827 self.params.order[field] = {
828 if by {
829 "DESC"
830 } else {
831 "ASC"
832 }
833 }
834 .into();
835 self
836 }
837
838 fn group(&mut self, field: &str) -> &mut Self {
839 let fields: Vec<&str> = field.split(",").collect();
840 for field in fields.iter() {
841 let field = field.to_string();
842 self.params.group[field.as_str()] = field.clone().into();
843 self.params.fields[field.as_str()] = field.clone().into();
844 }
845 self
846 }
847
848 fn distinct(&mut self) -> &mut Self {
849 self.params.distinct = true;
850 self
851 }
852
853 fn json(&mut self, field: &str) -> &mut Self {
854 let list: Vec<&str> = field.split(",").collect();
855 for item in list.iter() {
856 self.params.json[item.to_string().as_str()] = item.to_string().into();
857 }
858 self
859 }
860
861 fn location(&mut self, field: &str) -> &mut Self {
862 let list: Vec<&str> = field.split(",").collect();
863 for item in list.iter() {
864 self.params.location[item.to_string().as_str()] = item.to_string().into();
865 }
866 self
867 }
868
869 fn field(&mut self, field: &str) -> &mut Self {
870 let list: Vec<&str> = field.split(",").collect();
871 let join_table = if self.params.join_table.is_empty() {
872 self.params.table.clone()
873 } else {
874 self.params.join_table.clone()
875 };
876 for item in list.iter() {
877 let lower = item.to_lowercase();
878 let is_expr = lower.contains("count(")
879 || lower.contains("sum(")
880 || lower.contains("avg(")
881 || lower.contains("max(")
882 || lower.contains("min(")
883 || lower.contains("case ");
884 if is_expr {
885 self.params.fields[item.to_string().as_str()] = (*item).into();
886 } else if item.contains(" as ") {
887 let text = item.split(" as ").collect::<Vec<&str>>();
888 self.params.fields[item.to_string().as_str()] =
889 format!("{}.{} as {}", join_table, text[0], text[1]).into();
890 } else {
891 self.params.fields[item.to_string().as_str()] =
892 format!("{join_table}.{item}").into();
893 }
894 }
895 self
896 }
897
898 fn field_raw(&mut self, expr: &str) -> &mut Self {
899 self.params.fields[expr] = expr.into();
900 self
901 }
902
903 fn hidden(&mut self, name: &str) -> &mut Self {
904 let hidden: Vec<&str> = name.split(",").collect();
905
906 let fields_list = self.table_info(self.params.clone().table.as_str());
907 let mut data = array![];
908 for item in fields_list.members() {
909 let _ = data.push(object! {
910 "name":item["field"].as_str().unwrap_or("")
911 });
912 }
913
914 for item in data.members() {
915 let name = item["name"].as_str().unwrap_or("");
916 if !hidden.contains(&name) {
917 self.params.fields[name] = name.into();
918 }
919 }
920 self
921 }
922
923 fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
924 for f in field.split('|') {
925 if !super::sql_safety::validate_field_name(f) {
926 error!("Invalid field name: {}", f);
927 }
928 }
929 if !super::sql_safety::validate_compare_orator(compare) {
930 error!("Invalid compare operator: {}", compare);
931 }
932 let join_table = if self.params.join_table.is_empty() {
933 self.params.table.clone()
934 } else {
935 self.params.join_table.clone()
936 };
937 if value.is_boolean() {
938 let bool_val = value.as_bool().unwrap_or(false);
939 self.params
940 .where_and
941 .push(format!("{join_table}.{field} {compare} {bool_val}"));
942 return self;
943 }
944 match compare {
945 "between" => {
946 self.params.where_and.push(format!(
947 "{}.{} between '{}' AND '{}'",
948 join_table, field, value[0], value[1]
949 ));
950 }
951 "set" => {
952 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
953 let mut wheredata = vec![];
954 for item in list.iter() {
955 wheredata.push(format!(
956 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
957 ));
958 }
959 self.params
960 .where_and
961 .push(format!("({})", wheredata.join(" or ")));
962 }
963 "notin" => {
964 let mut text = String::new();
965 for item in value.members() {
966 text = format!("{text},'{item}'");
967 }
968 text = text.trim_start_matches(",").into();
969 self.params
970 .where_and
971 .push(format!("{join_table}.{field} not in ({text})"));
972 }
973 "is" => {
974 self.params
975 .where_and
976 .push(format!("{join_table}.{field} is {value}"));
977 }
978 "isnot" => {
979 self.params
980 .where_and
981 .push(format!("{join_table}.{field} is not {value}"));
982 }
983 "notlike" => {
984 self.params
985 .where_and
986 .push(format!("{join_table}.{field} not like '{value}'"));
987 }
988 "in" => {
989 if value.is_array() && value.is_empty() {
990 self.params.where_and.push("1=0".to_string());
991 return self;
992 }
993 let mut text = String::new();
994 if value.is_array() {
995 for item in value.members() {
996 text = format!("{text},'{item}'");
997 }
998 } else if value.is_null() {
999 text = format!("{text},null");
1000 } else {
1001 let value = value.as_str().unwrap_or("");
1002
1003 let value: Vec<&str> = value.split(",").collect();
1004 for item in value.iter() {
1005 text = format!("{text},'{item}'");
1006 }
1007 }
1008 text = text.trim_start_matches(",").into();
1009
1010 self.params
1011 .where_and
1012 .push(format!("{join_table}.{field} {compare} ({text})"));
1013 }
1014 "json_contains" => {
1018 if value.is_array() {
1019 if value.is_empty() {
1020 self.params.where_and.push("1=0".to_string());
1021 } else {
1022 let mut parts = vec![];
1023 for item in value.members() {
1024 let escaped = super::sql_safety::escape_string(&item.to_string());
1025 parts.push(format!(
1026 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1027 escaped
1028 ));
1029 }
1030 self.params
1031 .where_and
1032 .push(format!("({})", parts.join(" OR ")));
1033 }
1034 } else {
1035 let escaped = super::sql_safety::escape_string(&value.to_string());
1036 self.params.where_and.push(format!(
1037 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1038 escaped
1039 ));
1040 }
1041 }
1042 _ => {
1043 self.params
1044 .where_and
1045 .push(format!("{join_table}.{field} {compare} '{value}'"));
1046 }
1047 }
1048 self
1049 }
1050
1051 fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1052 for f in field.split('|') {
1053 if !super::sql_safety::validate_field_name(f) {
1054 error!("Invalid field name: {}", f);
1055 }
1056 }
1057 if !super::sql_safety::validate_compare_orator(compare) {
1058 error!("Invalid compare operator: {}", compare);
1059 }
1060 let join_table = if self.params.join_table.is_empty() {
1061 self.params.table.clone()
1062 } else {
1063 self.params.join_table.clone()
1064 };
1065
1066 if value.is_boolean() {
1067 let bool_val = value.as_bool().unwrap_or(false);
1068 self.params
1069 .where_or
1070 .push(format!("{join_table}.{field} {compare} {bool_val}"));
1071 return self;
1072 }
1073
1074 match compare {
1075 "between" => {
1076 self.params.where_or.push(format!(
1077 "{}.{} between '{}' AND '{}'",
1078 join_table, field, value[0], value[1]
1079 ));
1080 }
1081 "set" => {
1082 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1083 let mut wheredata = vec![];
1084 for item in list.iter() {
1085 wheredata.push(format!(
1086 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1087 ));
1088 }
1089 self.params
1090 .where_or
1091 .push(format!("({})", wheredata.join(" or ")));
1092 }
1093 "notin" => {
1094 let mut text = String::new();
1095 for item in value.members() {
1096 text = format!("{text},'{item}'");
1097 }
1098 text = text.trim_start_matches(",").into();
1099 self.params
1100 .where_or
1101 .push(format!("{join_table}.{field} not in ({text})"));
1102 }
1103 "is" => {
1104 self.params
1105 .where_or
1106 .push(format!("{join_table}.{field} is {value}"));
1107 }
1108 "isnot" => {
1109 self.params
1110 .where_or
1111 .push(format!("{join_table}.{field} is not {value}"));
1112 }
1113 "in" => {
1114 if value.is_array() && value.is_empty() {
1115 self.params.where_or.push("1=0".to_string());
1116 return self;
1117 }
1118 let mut text = String::new();
1119 if value.is_array() {
1120 for item in value.members() {
1121 text = format!("{text},'{item}'");
1122 }
1123 } else {
1124 let value = value.as_str().unwrap_or("");
1125 let value: Vec<&str> = value.split(",").collect();
1126 for item in value.iter() {
1127 text = format!("{text},'{item}'");
1128 }
1129 }
1130 text = text.trim_start_matches(",").into();
1131 self.params
1132 .where_or
1133 .push(format!("{join_table}.{field} {compare} ({text})"));
1134 }
1135 "json_contains" => {
1139 if value.is_array() {
1140 if value.is_empty() {
1141 self.params.where_or.push("1=0".to_string());
1142 } else {
1143 let mut parts = vec![];
1144 for item in value.members() {
1145 let escaped = super::sql_safety::escape_string(&item.to_string());
1146 parts.push(format!(
1147 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1148 escaped
1149 ));
1150 }
1151 self.params
1152 .where_or
1153 .push(format!("({})", parts.join(" OR ")));
1154 }
1155 } else {
1156 let escaped = super::sql_safety::escape_string(&value.to_string());
1157 self.params.where_or.push(format!(
1158 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1159 escaped
1160 ));
1161 }
1162 }
1163 _ => {
1164 self.params
1165 .where_or
1166 .push(format!("{join_table}.{field} {compare} '{value}'"));
1167 }
1168 }
1169 self
1170 }
1171
1172 fn where_raw(&mut self, expr: &str) -> &mut Self {
1173 self.params.where_and.push(expr.to_string());
1174 self
1175 }
1176
1177 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1178 self.params
1179 .where_and
1180 .push(format!("\"{field}\" IN ({sub_sql})"));
1181 self
1182 }
1183
1184 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1185 self.params
1186 .where_and
1187 .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1188 self
1189 }
1190
1191 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1192 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1193 self
1194 }
1195
1196 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1197 self.params
1198 .where_and
1199 .push(format!("NOT EXISTS ({sub_sql})"));
1200 self
1201 }
1202
1203 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1204 self.params.where_column = format!(
1205 "{}.{} {} {}.{}",
1206 self.params.table, field_a, compare, self.params.table, field_b
1207 );
1208 self
1209 }
1210
1211 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1212 self.params
1213 .update_column
1214 .push(format!("{field_a} = {compare}"));
1215 self
1216 }
1217
1218 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1219 self.params.page = page;
1220 self.params.limit = limit;
1221 self
1222 }
1223
1224 fn limit(&mut self, count: i32) -> &mut Self {
1225 self.params.limit_only = count;
1226 self
1227 }
1228
1229 fn column(&mut self, field: &str) -> JsonValue {
1230 self.field(field);
1231 let sql = self.params.select_sql();
1232
1233 if self.params.sql {
1234 return JsonValue::from(sql);
1235 }
1236 let (state, data) = self.query(sql.as_str());
1237 match state {
1238 true => {
1239 let mut list = array![];
1240 for item in data.members() {
1241 if self.params.json[field].is_empty() {
1242 let _ = list.push(item[field].clone());
1243 } else {
1244 let data =
1245 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1246 let _ = list.push(data);
1247 }
1248 }
1249 list
1250 }
1251 false => {
1252 array![]
1253 }
1254 }
1255 }
1256
1257 fn count(&mut self) -> JsonValue {
1258 self.params.fields = json::object! {};
1259 self.params.fields["count"] = "count(*) as count".to_string().into();
1260 let sql = self.params.select_sql();
1261 if self.params.sql {
1262 return JsonValue::from(sql.clone());
1263 }
1264 let (state, data) = self.query(sql.as_str());
1265 if state {
1266 data[0]["count"].clone()
1267 } else {
1268 JsonValue::from(0)
1269 }
1270 }
1271
1272 fn max(&mut self, field: &str) -> JsonValue {
1273 self.params.fields[field] = format!("max({field}) as {field}").into();
1274 let sql = self.params.select_sql();
1275 if self.params.sql {
1276 return JsonValue::from(sql.clone());
1277 }
1278 let (state, data) = self.query(sql.as_str());
1279 if state {
1280 if data.len() > 1 {
1281 return data.clone();
1282 }
1283 data[0][field].clone()
1284 } else {
1285 JsonValue::from(0)
1286 }
1287 }
1288
1289 fn min(&mut self, field: &str) -> JsonValue {
1290 self.params.fields[field] = format!("min({field}) as {field}").into();
1291 let sql = self.params.select_sql();
1292 if self.params.sql {
1293 return JsonValue::from(sql.clone());
1294 }
1295 let (state, data) = self.query(sql.as_str());
1296 if state {
1297 if data.len() > 1 {
1298 return data;
1299 }
1300 data[0][field].clone()
1301 } else {
1302 JsonValue::from(0)
1303 }
1304 }
1305
1306 fn sum(&mut self, field: &str) -> JsonValue {
1307 self.params.fields[field] = format!("sum({field}) as {field}").into();
1308 let sql = self.params.select_sql();
1309 if self.params.sql {
1310 return JsonValue::from(sql.clone());
1311 }
1312 let (state, data) = self.query(sql.as_str());
1313 match state {
1314 true => {
1315 if data.len() > 1 {
1316 return data;
1317 }
1318 data[0][field].clone()
1319 }
1320 false => JsonValue::from(0),
1321 }
1322 }
1323
1324 fn avg(&mut self, field: &str) -> JsonValue {
1325 self.params.fields[field] = format!("avg({field}) as {field}").into();
1326 let sql = self.params.select_sql();
1327 if self.params.sql {
1328 return JsonValue::from(sql.clone());
1329 }
1330 let (state, data) = self.query(sql.as_str());
1331 if state {
1332 if data.len() > 1 {
1333 return data;
1334 }
1335 data[0][field].clone()
1336 } else {
1337 JsonValue::from(0)
1338 }
1339 }
1340
1341 fn having(&mut self, expr: &str) -> &mut Self {
1342 self.params.having.push(expr.to_string());
1343 self
1344 }
1345
1346 fn select(&mut self) -> JsonValue {
1347 let sql = self.params.select_sql();
1348 if self.params.sql {
1349 return JsonValue::from(sql.clone());
1350 }
1351 let (state, mut data) = self.query(sql.as_str());
1352 match state {
1353 true => {
1354 for (field, _) in self.params.json.entries() {
1355 for item in data.members_mut() {
1356 if !item[field].is_empty() {
1357 let json = item[field].to_string();
1358 item[field] = match json::parse(&json) {
1359 Ok(e) => e,
1360 Err(_) => JsonValue::from(json),
1361 };
1362 }
1363 }
1364 }
1365 data.clone()
1366 }
1367 false => array![],
1368 }
1369 }
1370
1371 fn find(&mut self) -> JsonValue {
1372 self.params.page = 1;
1373 self.params.limit = 1;
1374 let sql = self.params.select_sql();
1375 if self.params.sql {
1376 return JsonValue::from(sql.clone());
1377 }
1378 let (state, mut data) = self.query(sql.as_str());
1379 match state {
1380 true => {
1381 if data.is_empty() {
1382 return object! {};
1383 }
1384 for (field, _) in self.params.json.entries() {
1385 if !data[0][field].is_empty() {
1386 let json = data[0][field].to_string();
1387 let json = json::parse(&json).unwrap_or(array![]);
1388 data[0][field] = json;
1389 } else {
1390 data[0][field] = array![];
1391 }
1392 }
1393 data[0].clone()
1394 }
1395 false => {
1396 object! {}
1397 }
1398 }
1399 }
1400
1401 fn value(&mut self, field: &str) -> JsonValue {
1402 self.params.fields = object! {};
1403 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1404 self.params.page = 1;
1405 self.params.limit = 1;
1406 let sql = self.params.select_sql();
1407 if self.params.sql {
1408 return JsonValue::from(sql.clone());
1409 }
1410 let (state, mut data) = self.query(sql.as_str());
1411 match state {
1412 true => {
1413 for (field, _) in self.params.json.entries() {
1414 if !data[0][field].is_empty() {
1415 let json = data[0][field].to_string();
1416 let json = json::parse(&json).unwrap_or(array![]);
1417 data[0][field] = json;
1418 } else {
1419 data[0][field] = array![];
1420 }
1421 }
1422 data[0][field].clone()
1423 }
1424 false => {
1425 if self.connection.debug {
1426 info!("{data:?}");
1427 }
1428 JsonValue::Null
1429 }
1430 }
1431 }
1432
1433 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1434 let fields_list = self.table_info(&self.params.table.clone());
1435 let mut fields = vec![];
1436 let mut values = vec![];
1437 if !self.params.autoinc && data["id"].is_empty() {
1438 let thread_id = format!("{:?}", std::thread::current().id());
1439 let thread_num: u64 = thread_id
1440 .trim_start_matches("ThreadId(")
1441 .trim_end_matches(")")
1442 .parse()
1443 .unwrap_or(0);
1444 data["id"] = format!(
1445 "{:X}{:X}",
1446 Local::now().timestamp_nanos_opt().unwrap_or(0),
1447 thread_num
1448 )
1449 .into();
1450 }
1451 for (field, value) in data.entries() {
1452 fields.push(format!("\"{}\"", field));
1453
1454 if value.is_string() {
1455 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1456 continue;
1457 } else if value.is_array() {
1458 if self.params.json[field].is_empty() {
1459 let array = value
1460 .members()
1461 .map(|x| x.as_str().unwrap_or(""))
1462 .collect::<Vec<&str>>()
1463 .join(",");
1464 values.push(format!("'{}'", array.replace("'", "''")));
1465 } else {
1466 let json = value.to_string();
1467 let json = json.replace("'", "''");
1468 values.push(format!("'{json}'"));
1469 }
1470 continue;
1471 } else if value.is_object() {
1472 if self.params.json[field].is_empty() {
1473 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1474 } else {
1475 let json = value.to_string();
1476 let json = json.replace("'", "''");
1477 values.push(format!("'{json}'"));
1478 }
1479 continue;
1480 } else if value.is_number() {
1481 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1482 if col_type == "boolean" {
1483 let bool_val = value.as_i64().unwrap_or(0) != 0;
1484 values.push(format!("{bool_val}"));
1485 } else if col_type.contains("int") {
1486 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1487 } else {
1488 values.push(format!("{value}"));
1489 }
1490 continue;
1491 } else if value.is_boolean() || value.is_null() {
1492 values.push(format!("{value}"));
1493 continue;
1494 } else {
1495 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1496 continue;
1497 }
1498 }
1499 let fields = fields.join(",");
1500 let values = values.join(",");
1501
1502 let sql = format!(
1503 "INSERT INTO {} ({}) VALUES ({});",
1504 self.params.table, fields, values
1505 );
1506 if self.params.sql {
1507 return JsonValue::from(sql.clone());
1508 }
1509 let (state, ids) = self.execute(sql.as_str());
1510
1511 match state {
1512 true => match self.params.autoinc {
1513 true => ids.clone(),
1514 false => data["id"].clone(),
1515 },
1516 false => {
1517 let thread_id = format!("{:?}", thread::current().id());
1518 error!("添加失败: {thread_id} {ids:?} {sql}");
1519 JsonValue::from("")
1520 }
1521 }
1522 }
1523 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1524 let fields_list = self.table_info(&self.params.table.clone());
1525 let mut fields = String::new();
1526 if !self.params.autoinc && data[0]["id"].is_empty() {
1527 data[0]["id"] = "".into();
1528 }
1529 for (field, _) in data[0].entries() {
1530 fields = format!("{fields},\"{field}\"");
1531 }
1532 fields = fields.trim_start_matches(",").to_string();
1533
1534 let core_count = num_cpus::get();
1535 let mut p = pools::Pool::new(core_count * 4);
1536
1537 let autoinc = self.params.autoinc;
1538 for list in data.members() {
1539 let mut item = list.clone();
1540 let i = br_fields::str::Code::verification_code(3);
1541 let fields_list_new = fields_list.clone();
1542 p.execute(move |pcindex| {
1543 if !autoinc && item["id"].is_empty() {
1544 let id = format!(
1545 "{:X}{:X}{}",
1546 Local::now().timestamp_nanos_opt().unwrap_or(0),
1547 pcindex,
1548 i
1549 );
1550 item["id"] = id.into();
1551 }
1552 let mut values = "".to_string();
1553 for (field, value) in item.entries() {
1554 if value.is_string() {
1555 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1556 } else if value.is_number() {
1557 let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1558 if col_type == "boolean" {
1559 let bool_val = value.as_i64().unwrap_or(0) != 0;
1560 values = format!("{values},{bool_val}");
1561 } else if col_type.contains("int") {
1562 values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1563 } else {
1564 values = format!("{values},{value}");
1565 }
1566 } else if value.is_boolean() {
1567 values = format!("{values},{value}");
1568 continue;
1569 } else {
1570 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1571 }
1572 }
1573 values = format!("({})", values.trim_start_matches(","));
1574 array![item["id"].clone(), values]
1575 });
1576 }
1577 let (ids_list, mut values) = p.insert_all();
1578 values = values.trim_start_matches(",").to_string();
1579 let sql = format!(
1580 "INSERT INTO {} ({}) VALUES {};",
1581 self.params.table, fields, values
1582 );
1583
1584 if self.params.sql {
1585 return JsonValue::from(sql.clone());
1586 }
1587 let (state, data) = self.execute(sql.as_str());
1588 match state {
1589 true => match autoinc {
1590 true => data,
1591 false => JsonValue::from(ids_list),
1592 },
1593 false => {
1594 error!("insert_all: {data:?}");
1595
1596 array![]
1597 }
1598 }
1599 }
1600 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1601 let fields_list = self.table_info(&self.params.table.clone());
1602 let mut fields = vec![];
1603 let mut values = vec![];
1604 if !self.params.autoinc && data["id"].is_empty() {
1605 let thread_id = format!("{:?}", std::thread::current().id());
1606 let thread_num: u64 = thread_id
1607 .trim_start_matches("ThreadId(")
1608 .trim_end_matches(")")
1609 .parse()
1610 .unwrap_or(0);
1611 data["id"] = format!(
1612 "{:X}{:X}",
1613 Local::now().timestamp_nanos_opt().unwrap_or(0),
1614 thread_num
1615 )
1616 .into();
1617 }
1618 for (field, value) in data.entries() {
1619 fields.push(format!("\"{}\"", field));
1620
1621 if value.is_string() {
1622 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1623 continue;
1624 } else if value.is_array() {
1625 if self.params.json[field].is_empty() {
1626 let array = value
1627 .members()
1628 .map(|x| x.as_str().unwrap_or(""))
1629 .collect::<Vec<&str>>()
1630 .join(",");
1631 values.push(format!("'{}'", array.replace("'", "''")));
1632 } else {
1633 let json = value.to_string();
1634 let json = json.replace("'", "''");
1635 values.push(format!("'{json}'"));
1636 }
1637 continue;
1638 } else if value.is_object() {
1639 if self.params.json[field].is_empty() {
1640 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1641 } else {
1642 let json = value.to_string();
1643 let json = json.replace("'", "''");
1644 values.push(format!("'{json}'"));
1645 }
1646 continue;
1647 } else if value.is_number() {
1648 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1649 if col_type == "boolean" {
1650 let bool_val = value.as_i64().unwrap_or(0) != 0;
1651 values.push(format!("{bool_val}"));
1652 } else if col_type.contains("int") {
1653 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1654 } else {
1655 values.push(format!("{value}"));
1656 }
1657 continue;
1658 } else if value.is_boolean() || value.is_null() {
1659 values.push(format!("{value}"));
1660 continue;
1661 } else {
1662 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1663 continue;
1664 }
1665 }
1666
1667 let conflict_cols: Vec<String> = conflict_fields
1668 .iter()
1669 .map(|f| format!("\"{}\"", f))
1670 .collect();
1671
1672 let update_set: Vec<String> = fields
1673 .iter()
1674 .filter(|f| {
1675 let name = f.trim_matches('"');
1676 !conflict_fields.contains(&name) && name != "id"
1677 })
1678 .map(|f| format!("{f}=EXCLUDED.{f}"))
1679 .collect();
1680
1681 let fields_str = fields.join(",");
1682 let values_str = values.join(",");
1683
1684 let sql = format!(
1685 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1686 self.params.table,
1687 fields_str,
1688 values_str,
1689 conflict_cols.join(","),
1690 update_set.join(",")
1691 );
1692 if self.params.sql {
1693 return JsonValue::from(sql.clone());
1694 }
1695 let (state, result) = self.execute(sql.as_str());
1696 match state {
1697 true => match self.params.autoinc {
1698 true => result.clone(),
1699 false => data["id"].clone(),
1700 },
1701 false => {
1702 let thread_id = format!("{:?}", thread::current().id());
1703 error!("upsert失败: {thread_id} {result:?} {sql}");
1704 JsonValue::from("")
1705 }
1706 }
1707 }
1708 fn update(&mut self, data: JsonValue) -> JsonValue {
1709 let fields_list = self.table_info(&self.params.table.clone());
1710 let mut values = vec![];
1711 for (field, value) in data.entries() {
1712 if value.is_string() {
1713 values.push(format!(
1714 "\"{}\"='{}'",
1715 field,
1716 value.to_string().replace("'", "''")
1717 ));
1718 } else if value.is_number() {
1719 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1720 if col_type == "boolean" {
1721 let bool_val = value.as_i64().unwrap_or(0) != 0;
1722 values.push(format!("\"{field}\"= {bool_val}"));
1723 } else if col_type.contains("int") {
1724 values.push(format!(
1725 "\"{}\"= {}",
1726 field,
1727 value.as_f64().unwrap_or(0.0) as i64
1728 ));
1729 } else {
1730 values.push(format!("\"{field}\"= {value}"));
1731 }
1732 } else if value.is_array() {
1733 if self.params.json[field].is_empty() {
1734 let array = value
1735 .members()
1736 .map(|x| x.as_str().unwrap_or(""))
1737 .collect::<Vec<&str>>()
1738 .join(",");
1739 values.push(format!("\"{}\"='{}'", field, array.replace("'", "''")));
1740 } else {
1741 let json = value.to_string();
1742 let json = json.replace("'", "''");
1743 values.push(format!("\"{field}\"='{json}'"));
1744 }
1745 continue;
1746 } else if value.is_object() {
1747 if self.params.json[field].is_empty() {
1748 values.push(format!("\"{}\"='{}'", field, value.to_string().replace("'", "''")));
1749 } else {
1750 if value.is_empty() {
1751 values.push(format!("\"{field}\"=''"));
1752 continue;
1753 }
1754 let json = value.to_string();
1755 let json = json.replace("'", "''");
1756 values.push(format!("\"{field}\"='{json}'"));
1757 }
1758 continue;
1759 } else if value.is_boolean() || value.is_null() {
1760 values.push(format!("\"{field}\"= {value}"));
1761 } else {
1762 values.push(format!("\"{field}\"=\"{value}\""));
1763 }
1764 }
1765
1766 for (field, value) in self.params.inc_dec.entries() {
1767 values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1768 }
1769 if !self.params.update_column.is_empty() {
1770 values.extend(self.params.update_column.clone());
1771 }
1772 let values = values.join(",");
1773
1774 let sql = format!(
1775 "UPDATE {} SET {} {};",
1776 self.params.table.clone(),
1777 values,
1778 self.params.where_sql()
1779 );
1780 if self.params.sql {
1781 return JsonValue::from(sql.clone());
1782 }
1783 let (state, data) = self.execute(sql.as_str());
1784 if state {
1785 data
1786 } else {
1787 let thread_id = format!("{:?}", thread::current().id());
1788 error!("update: {thread_id} {data:?} {sql}");
1789 0.into()
1790 }
1791 }
1792 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1793 let fields_list = self.table_info(&self.params.table.clone());
1794 let mut values = vec![];
1795
1796 let mut ids = vec![];
1797 for (field, _) in data[0].entries() {
1798 if field == "id" {
1799 continue;
1800 }
1801 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1802 let mut fields = vec![];
1803 for row in data.members() {
1804 let value = row[field].clone();
1805 let id = row["id"].clone();
1806 ids.push(id.clone());
1807 if value.is_string() {
1808 fields.push(format!(
1809 "WHEN '{}' THEN '{}'",
1810 id,
1811 value.to_string().replace("'", "''")
1812 ));
1813 } else if value.is_array() || value.is_object() {
1814 if self.params.json[field].is_empty() {
1815 fields.push(format!("WHEN '{}' THEN '{}'", id, value.to_string().replace("'", "''")));
1816 } else {
1817 let json = value.to_string();
1818 let json = json.replace("'", "''");
1819 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1820 }
1821 continue;
1822 } else if value.is_number() {
1823 if col_type == "boolean" {
1824 let bool_val = value.as_i64().unwrap_or(0) != 0;
1825 fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1826 } else {
1827 fields.push(format!("WHEN '{id}' THEN {value}"));
1828 }
1829 } else if value.is_boolean() || value.is_null() {
1830 fields.push(format!("WHEN '{id}' THEN {value}"));
1831 } else {
1832 fields.push(format!("WHEN '{}' THEN '{}'", id, value.to_string().replace("'", "''")));
1833 }
1834 }
1835 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1836 }
1837 self.where_and("id", "in", ids.into());
1838 for (field, value) in self.params.inc_dec.entries() {
1839 values.push(format!("{} = {}", field, value.to_string().clone()));
1840 }
1841
1842 let values = values.join(",");
1843 let sql = format!(
1844 "UPDATE {} SET {} {} {};",
1845 self.params.table.clone(),
1846 values,
1847 self.params.where_sql(),
1848 self.params.page_limit_sql()
1849 );
1850 if self.params.sql {
1851 return JsonValue::from(sql.clone());
1852 }
1853 let (state, data) = self.execute(sql.as_str());
1854 if state {
1855 data
1856 } else {
1857 error!("update_all: {data:?}");
1858 JsonValue::from(0)
1859 }
1860 }
1861
1862 fn delete(&mut self) -> JsonValue {
1863 let sql = format!(
1864 "delete FROM {} {} {};",
1865 self.params.table.clone(),
1866 self.params.where_sql(),
1867 self.params.page_limit_sql()
1868 );
1869 if self.params.sql {
1870 return JsonValue::from(sql.clone());
1871 }
1872 let (state, data) = self.execute(sql.as_str());
1873 match state {
1874 true => data,
1875 false => {
1876 error!("delete 失败>>> {data:?}");
1877 JsonValue::from(0)
1878 }
1879 }
1880 }
1881
1882 fn transaction(&mut self) -> bool {
1883 let thread_id = format!("{:?}", thread::current().id());
1884 let key = format!("{}{}", self.default, thread_id);
1885
1886 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1887 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1888 PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1889 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1890 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1891 return true;
1892 }
1893
1894 let mut conn = None;
1896 for attempt in 0..3u8 {
1897 match self.client.get_connect_for_transaction() {
1898 Ok(mut c) => {
1899 if c.is_valid() {
1900 conn = Some(c);
1901 break;
1902 }
1903 warn!("事务连接无效(第{}次)", attempt + 1);
1904 self.client.release_transaction_conn();
1905 }
1906 Err(e) => {
1907 warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1908 }
1909 }
1910 if attempt < 2 {
1911 thread::sleep(std::time::Duration::from_millis(200));
1912 }
1913 }
1914 let mut conn = match conn {
1915 Some(c) => c,
1916 None => {
1917 error!("获取事务连接重试耗尽");
1918 return false;
1919 }
1920 };
1921
1922 if let Err(e) = conn.execute("START TRANSACTION") {
1923 error!("启动事务失败: {e}");
1924 self.client.release_transaction_conn();
1925 return false;
1926 }
1927
1928
1929 PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1930 true
1931 }
1932 fn commit(&mut self) -> bool {
1933 let thread_id = format!("{:?}", thread::current().id());
1934 let key = format!("{}{}", self.default, thread_id);
1935
1936 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1937 error!("commit: 没有活跃的事务");
1938 return false;
1939 }
1940
1941 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1942 if depth > 1 {
1943 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1944 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1945 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1946 return true;
1947 }
1948
1949 let commit_result =
1950 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1951
1952 let success = match commit_result {
1953 Some(Ok(_)) => true,
1954 Some(Err(e)) => {
1955 error!("提交事务失败: {e}");
1956 false
1957 }
1958 None => {
1959 error!("提交事务失败: 未找到连接");
1960 false
1961 }
1962 };
1963
1964 if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id) {
1965 self.client.release_transaction_conn_with_conn(conn);
1966 } else {
1967 self.client.release_transaction_conn();
1968 }
1969 success
1970 }
1971
1972 fn rollback(&mut self) -> bool {
1973 let thread_id = format!("{:?}", thread::current().id());
1974 let key = format!("{}{}", self.default, thread_id);
1975
1976 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1977 error!("rollback: 没有活跃的事务");
1978 return false;
1979 }
1980
1981 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1982 if depth > 1 {
1983 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1984 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1985 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1986 return true;
1987 }
1988
1989 let rollback_result =
1990 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1991
1992 let success = match rollback_result {
1993 Some(Ok(_)) => true,
1994 Some(Err(e)) => {
1995 error!("回滚失败: {e}");
1996 false
1997 }
1998 None => {
1999 error!("回滚失败: 未找到连接");
2000 false
2001 }
2002 };
2003
2004 if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id) {
2005 self.client.release_transaction_conn_with_conn(conn);
2006 } else {
2007 self.client.release_transaction_conn();
2008 }
2009 success
2010 }
2011
2012 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
2013 let (state, data) = self.query(sql);
2014 match state {
2015 true => Ok(data),
2016 false => Err(data.to_string()),
2017 }
2018 }
2019
2020 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2021 let (state, data) = self.execute(sql);
2022 match state {
2023 true => Ok(data),
2024 false => Err(data.to_string()),
2025 }
2026 }
2027
2028 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2029 self.params.inc_dec[field] = format!("{field} + {num}").into();
2030 self
2031 }
2032
2033 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2034 self.params.inc_dec[field] = format!("{field} - {num}").into();
2035 self
2036 }
2037 fn buildsql(&mut self) -> String {
2038 self.fetch_sql();
2039 let sql = self.select().to_string();
2040 format!("( {} ) {}", sql, self.params.table)
2041 }
2042
2043 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2044 for field in fields {
2045 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2046 }
2047 self
2048 }
2049
2050 fn join(
2051 &mut self,
2052 main_table: &str,
2053 main_fields: &str,
2054 right_table: &str,
2055 right_fields: &str,
2056 ) -> &mut Self {
2057 let main_table = if main_table.is_empty() {
2058 self.params.table.clone()
2059 } else {
2060 main_table.to_string()
2061 };
2062 self.params.join_table = right_table.to_string();
2063 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2064 self
2065 }
2066
2067 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2068 let main_fields = if main_fields.is_empty() {
2069 "id"
2070 } else {
2071 main_fields
2072 };
2073 let second_fields = if second_fields.is_empty() {
2074 self.params.table.clone()
2075 } else {
2076 second_fields.to_string().clone()
2077 };
2078 let sec_table_name = format!("{}{}", table, "_2");
2079 let second_table = format!("{} {}", table, sec_table_name.clone());
2080 self.params.join_table = sec_table_name.clone();
2081 self.params.join.push(format!(
2082 " INNER JOIN {} ON {}.{} = {}.{}",
2083 second_table, self.params.table, main_fields, sec_table_name, second_fields
2084 ));
2085 self
2086 }
2087
2088 fn join_right(
2089 &mut self,
2090 main_table: &str,
2091 main_fields: &str,
2092 right_table: &str,
2093 right_fields: &str,
2094 ) -> &mut Self {
2095 let main_table = if main_table.is_empty() {
2096 self.params.table.clone()
2097 } else {
2098 main_table.to_string()
2099 };
2100 self.params.join_table = right_table.to_string();
2101 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2102 self
2103 }
2104
2105 fn join_full(
2106 &mut self,
2107 main_table: &str,
2108 main_fields: &str,
2109 right_table: &str,
2110 right_fields: &str,
2111 ) -> &mut Self {
2112 let main_table = if main_table.is_empty() {
2113 self.params.table.clone()
2114 } else {
2115 main_table.to_string()
2116 };
2117 self.params.join_table = right_table.to_string();
2118 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2119 self
2120 }
2121
2122 fn union(&mut self, sub_sql: &str) -> &mut Self {
2123 self.params.unions.push(format!("UNION {sub_sql}"));
2124 self
2125 }
2126
2127 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2128 self.params.unions.push(format!("UNION ALL {sub_sql}"));
2129 self
2130 }
2131
2132 fn lock_for_update(&mut self) -> &mut Self {
2133 self.params.lock_mode = "FOR UPDATE".to_string();
2134 self
2135 }
2136
2137 fn lock_for_share(&mut self) -> &mut Self {
2138 self.params.lock_mode = "FOR SHARE".to_string();
2139 self
2140 }
2141}