1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use br_pgsql::PgsqlError;
7use chrono::Local;
8use json::{array, object, JsonValue};
9use log::{error, info, warn};
10use std::thread;
11#[derive(Clone)]
12pub struct Pgsql {
13 pub connection: Connection,
15 pub default: String,
17 pub params: Params,
18 pub client: Pools,
19}
20
21impl Pgsql {
22 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
23 let port = connection
24 .hostport
25 .parse::<i32>()
26 .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
27
28 let cp_connection = connection.clone();
29 let config = object! {
30 debug: cp_connection.debug,
31 username: cp_connection.username,
32 userpass: cp_connection.userpass,
33 database: cp_connection.database,
34 hostname: cp_connection.hostname,
35 hostport: port,
36 charset: cp_connection.charset.str(),
37 };
38 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
39
40 let pools = pgsql.pools()?;
41 Ok(Self {
42 connection,
43 default: default.clone(),
44 params: Params::default("pgsql"),
45 client: pools,
46 })
47 }
48
49 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
50 let thread_id = format!("{:?}", thread::current().id());
51 let key = format!("{}{}", self.default, thread_id);
52
53 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
54 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
55
56 match result {
57 Some(Ok(e)) => {
58 if self.connection.debug {
59 info!("查询成功: {} {}", thread_id.clone(), sql);
60 }
61 (true, e.rows)
62 }
63 Some(Err(e)) => {
64 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
65 (false, JsonValue::from(e.to_string()))
66 }
67 None => {
68 error!("事务查询失败: 未找到事务连接 {thread_id}");
69 (false, JsonValue::from("未找到事务连接"))
70 }
71 }
72 } else {
73 const RETRY_BACKOFF_MS: [u64; 3] = [200, 500, 1000];
74 let mut last_err = String::new();
75 for attempt in 0..3u8 {
76 let mut guard = match self.client.get_guard() {
77 Ok(g) => g,
78 Err(e) => {
79 if attempt == 2 {
80 error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
81 return (false, JsonValue::from(e.to_string()));
82 }
83 let backoff = RETRY_BACKOFF_MS[attempt as usize];
84 warn!(
85 "非事务查询 get_guard 失败(第{}次, {}ms后重试): {thread_id} {e}",
86 attempt + 1,
87 backoff
88 );
89 thread::sleep(std::time::Duration::from_millis(backoff));
90 continue;
91 }
92 };
93 match guard.conn().query(sql) {
94 Ok(e) => {
95 if self.connection.debug {
96 info!("查询成功: {} {}", thread_id.clone(), sql);
97 }
98 return (true, e.rows);
99 }
100 Err(ref e) if Self::is_retriable_error(e) => {
101 last_err = e.to_string();
102 guard.discard();
103 if attempt < 2 {
104 let backoff = RETRY_BACKOFF_MS[attempt as usize];
105 warn!(
106 "非事务查询连接断开(第{}次, {}ms后重试): {thread_id} {e}",
107 attempt + 1,
108 backoff
109 );
110 thread::sleep(std::time::Duration::from_millis(backoff));
111 } else {
112 warn!(
113 "非事务查询连接断开(第{}次, 不再重试): {thread_id} {e}",
114 attempt + 1
115 );
116 }
117 }
118 Err(e) => {
119 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
120 return (false, JsonValue::from(e.to_string()));
121 }
122 }
123 }
124 error!("非事务查询重试耗尽: 线程ID: {thread_id} 错误: {last_err} SQL语句: [{sql}]");
125 (false, JsonValue::from(last_err))
126 }
127 }
128 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
129 let thread_id = format!("{:?}", thread::current().id());
130 let key = format!("{}{}", self.default, thread_id);
131
132 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
133 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
134
135 match result {
136 Some(Ok(e)) => {
137 if self.connection.debug {
138 info!("提交成功: {} {}", thread_id.clone(), sql);
139 }
140 if sql.contains("INSERT") {
141 (true, e.rows)
142 } else {
143 (true, e.affect_count.into())
144 }
145 }
146 Some(Err(e)) => {
147 error!("事务提交失败: {thread_id} {e}");
148 (false, JsonValue::from(e.to_string()))
149 }
150 None => {
151 error!("事务执行失败: 未找到事务连接 {thread_id}");
152 (false, JsonValue::from("未找到事务连接"))
153 }
154 }
155 } else {
156 const RETRY_BACKOFF_MS: [u64; 3] = [200, 500, 1000];
160 let mut last_err = String::new();
161 for attempt in 0..3u8 {
162 let mut guard = match self.client.get_guard() {
163 Ok(g) => g,
164 Err(e) => {
165 if attempt == 2 {
166 error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
167 return (false, JsonValue::from(e.to_string()));
168 }
169 let backoff = RETRY_BACKOFF_MS[attempt as usize];
170 warn!(
171 "非事务执行 get_guard 失败(第{}次, {}ms后重试): {thread_id} {e}",
172 attempt + 1,
173 backoff
174 );
175 thread::sleep(std::time::Duration::from_millis(backoff));
176 continue;
177 }
178 };
179 match guard.conn().execute(sql) {
180 Ok(e) => {
181 if self.connection.debug {
182 info!("提交成功: {} {}", thread_id.clone(), sql);
183 }
184 if sql.contains("INSERT") {
185 return (true, e.rows);
186 } else {
187 return (true, e.affect_count.into());
188 }
189 }
190 Err(ref e) if Self::is_retriable_error(e) => {
191 last_err = e.to_string();
192 guard.discard();
193 if attempt < 2 {
194 let backoff = RETRY_BACKOFF_MS[attempt as usize];
195 warn!(
196 "非事务执行连接断开(第{}次, {}ms后重试): {thread_id} {e}",
197 attempt + 1,
198 backoff
199 );
200 thread::sleep(std::time::Duration::from_millis(backoff));
201 } else {
202 warn!(
203 "非事务执行连接断开(第{}次, 不再重试): {thread_id} {e}",
204 attempt + 1
205 );
206 }
207 }
208 Err(e) => {
209 error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
210 return (false, JsonValue::from(e.to_string()));
211 }
212 }
213 }
214 error!("非事务执行重试耗尽: 线程ID: {thread_id} 错误: {last_err} SQL语句: [{sql}]");
215 (false, JsonValue::from(last_err))
216 }
217 }
218
219 fn is_retriable_error(e: &PgsqlError) -> bool {
221 matches!(
222 e,
223 PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
224 )
225 }
226}
227
228impl DbMode for Pgsql {
229 fn database_tables(&mut self) -> JsonValue {
230 let sql = "SHOW TABLES".to_string();
231 match self.sql(sql.as_str()) {
232 Ok(e) => {
233 let mut list = vec![];
234 for item in e.members() {
235 for (_, value) in item.entries() {
236 list.push(value.clone());
237 }
238 }
239 list.into()
240 }
241 Err(_) => {
242 array![]
243 }
244 }
245 }
246
247 fn database_create(&mut self, name: &str) -> bool {
248 let sql = format!("CREATE DATABASE {name}");
249
250 let (state, data) = self.execute(sql.as_str());
251 match state {
252 true => data.as_bool().unwrap_or(true),
253 false => {
254 error!("创建数据库失败: {data:?}");
255 false
256 }
257 }
258 }
259
260 fn truncate(&mut self, table: &str) -> bool {
261 let sql = format!("TRUNCATE TABLE {table}");
262 let (state, _) = self.execute(sql.as_str());
263 state
264 }
265}
266
267impl Mode for Pgsql {
268 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
269 let mut sql = String::new();
270 let mut comments = vec![];
271
272 if !options.table_unique.is_empty() {
273 let full_name = format!(
274 "{}_unique_{}",
275 options.table_name,
276 options.table_unique.join("_")
277 );
278 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
279 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
280 let unique = format!(
281 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
282 name,
283 options.table_name,
284 options.table_unique.join(",")
285 );
286 comments.push(unique);
287 }
288
289 for row in options.table_index.iter() {
290 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
291 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
292 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
293 let index = format!(
294 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
295 name,
296 options.table_name,
297 row.join(",")
298 );
299 comments.push(index);
300 }
301
302 for (name, field) in options.table_fields.entries_mut() {
303 field["table_name"] = options.table_name.clone().into();
304 let row = br_fields::field("pgsql", name, field.clone());
305 let (col_sql, meta) = if let Some(idx) = row.find("--") {
306 (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
307 } else {
308 (row.trim(), None)
309 };
310 if let Some(meta) = meta {
311 comments.push(format!(
312 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
313 options.table_name, name, meta
314 ));
315 }
316 sql = format!("{} {},\r\n", sql, col_sql);
317 }
318
319 let primary_key = format!(
320 "CONSTRAINT {}_{} PRIMARY KEY ({})",
321 options.table_name, options.table_key, options.table_key
322 );
323 let sql = format!(
324 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
325 options.table_name,
326 sql.trim_end_matches(",\r\n"),
327 primary_key
328 );
329 comments.insert(0, sql);
330
331 for (_name, field) in options.table_fields.entries() {
332 let _ = field["mode"].as_str();
333 }
334
335 if self.params.sql {
336 let info = comments.join("\r\n");
337 return JsonValue::from(info);
338 }
339 for comment in comments {
340 let (state, _) = self.execute(comment.as_str());
341 match state {
342 true => {}
343 false => {
344 return JsonValue::from(state);
345 }
346 }
347 }
348 JsonValue::from(true)
349 }
350
351 fn table_update(&mut self, options: TableOptions) -> JsonValue {
352 let fields_list = self.table_info(&options.table_name);
353 let mut put = vec![];
354 let mut add = vec![];
355 let mut del = vec![];
356 let mut comments = vec![];
357
358 for (key, _) in fields_list.entries() {
359 if options.table_fields[key].is_empty() {
360 del.push(key);
361 }
362 }
363 for (name, field) in options.table_fields.entries() {
364 if !fields_list[name].is_empty() {
365 let old_info = &fields_list[name];
366 let new_field_sql = br_fields::field("pgsql", name, field.clone());
367
368 let old_comment = old_info["comment"].as_str().unwrap_or("");
369 let old_type = old_info["type"].as_str().unwrap_or("");
370
371 let new_comment = if let Some(idx) = new_field_sql.find("--") {
372 new_field_sql[idx + 2..].trim()
373 } else {
374 ""
375 };
376
377 let comment_matches =
378 if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
379 let old_parts: Vec<&str> = old_comment.split('|').collect();
380 let new_parts: Vec<&str> = new_comment.split('|').collect();
381 if old_parts.len() >= 4 && new_parts.len() >= 4 {
382 old_parts[..4] == new_parts[..4]
383 } else {
384 old_comment == new_comment
385 }
386 } else if !old_comment.is_empty() && !new_comment.is_empty() {
387 let old_parts: Vec<&str> = old_comment.split('|').collect();
388 let new_parts: Vec<&str> = new_comment.split('|').collect();
389 if old_parts.len() >= 2
390 && new_parts.len() >= 2
391 && old_parts.len() == new_parts.len()
392 {
393 let old_filtered: Vec<&str> = old_parts
394 .iter()
395 .filter(|v| **v != "true" && **v != "false")
396 .copied()
397 .collect();
398 let new_filtered: Vec<&str> = new_parts
399 .iter()
400 .filter(|v| **v != "true" && **v != "false")
401 .copied()
402 .collect();
403 old_filtered == new_filtered
404 } else {
405 old_comment == new_comment
406 }
407 } else {
408 old_comment == new_comment
409 };
410
411 let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
412 let new_type = if sql_parts.len() > 1 {
413 sql_parts[1].to_lowercase()
414 } else {
415 String::new()
416 };
417
418 let type_matches = match old_type {
419 "integer" => {
420 new_type.contains("int")
421 && !new_type.contains("bigint")
422 && !new_type.contains("smallint")
423 }
424 "bigint" => new_type.contains("bigint"),
425 "smallint" => new_type.contains("smallint"),
426 "boolean" => new_type.contains("boolean"),
427 "text" => new_type.contains("text"),
428 "character varying" => {
429 if !new_type.contains("varchar") {
430 false
431 } else {
432 let old_len = old_info["max_length"].as_i64().unwrap_or(0);
433 let new_len = new_type
434 .trim_start_matches("varchar(")
435 .trim_end_matches(')')
436 .parse::<i64>()
437 .unwrap_or(0);
438 let matched = old_len == new_len || new_len == 0;
439 if !matched {
440 log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
441 }
442 old_len == new_len || new_len == 0
443 }
444 }
445 "character" => new_type.contains("char") && !new_type.contains("varchar"),
446 "numeric" => {
447 if !(new_type.contains("numeric") || new_type.contains("decimal")) {
448 false
449 } else {
450 let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
451 let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
452 let inner = new_type
453 .replace("numeric(", "")
454 .replace("decimal(", "")
455 .replace(')', "");
456 let parts: Vec<&str> = inner.split(',').collect();
457 let new_prec = parts.first().and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
458 let new_scale = parts.get(1).and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
459 old_prec == new_prec && old_scale == new_scale
460 }
461 }
462 "double precision" => {
463 new_type.contains("double") || new_type.contains("float8")
464 }
465 "real" => new_type.contains("real") || new_type.contains("float4"),
466 "timestamp without time zone" | "timestamp with time zone" => {
467 new_type.contains("timestamp")
468 }
469 "date" => new_type.contains("date") && !new_type.contains("timestamp"),
470 "time without time zone" | "time with time zone" => {
471 new_type.contains("time") && !new_type.contains("timestamp")
472 }
473 "json" | "jsonb" => new_type.contains("json"),
474 "uuid" => new_type.contains("uuid"),
475 "bytea" => new_type.contains("bytea"),
476 _ => old_type == new_type,
477 };
478
479 if type_matches && comment_matches {
480 continue;
481 }
482
483 log::debug!(
484 "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
485 options.table_name,
486 name,
487 type_matches,
488 old_type,
489 new_type,
490 comment_matches
491 );
492 put.push(name);
493 } else {
494 add.push(name);
495 }
496 }
497
498 for name in add.iter() {
499 let name = name.to_string();
500 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
501 let rows = row.split("--").collect::<Vec<&str>>();
502 comments.push(format!(
503 r#"ALTER TABLE "{}" add {};"#,
504 options.table_name,
505 rows[0].trim()
506 ));
507 if rows.len() > 1 {
508 comments.push(format!(
509 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
510 options.table_name,
511 name,
512 rows[1].trim()
513 ));
514 }
515 }
516 for name in del.iter() {
517 comments.push(format!(
518 "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
519 options.table_name, name
520 ));
521 }
522 for name in put.iter() {
523 let name = name.to_string();
524 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
525 let rows = row.split("--").collect::<Vec<&str>>();
526
527 let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
528
529 if sql[1].contains("BOOLEAN") {
530 let text = format!(
531 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
532 options.table_name, name
533 );
534 comments.push(text.clone());
535 let text = format!(
536 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
537 options.table_name, name, sql[1]
538 );
539 comments.push(text.clone());
540 } else {
541 let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
542 let new_type_lower = sql[1].to_lowercase();
543 let is_date_to_numeric = (old_col_type == "date"
544 || old_col_type.contains("timestamp"))
545 && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
546 if is_date_to_numeric {
547 comments.push(format!(
548 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
549 options.table_name, name
550 ));
551 comments.push(format!(
552 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
553 options.table_name, name, sql[1], name, name, name
554 ));
555 } else {
556 let text = format!(
557 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
558 options.table_name, name, sql[1]
559 );
560 comments.push(text.clone());
561 }
562 };
563
564 if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
565 let default_value = rows[0][default_pos + 9..].trim();
566 if !default_value.is_empty() {
567 comments.push(format!(
568 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
569 options.table_name, name, default_value
570 ));
571 }
572 }
573 let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
576 .as_str()
577 .unwrap_or("YES");
578 let old_is_required = old_is_nullable == "NO";
579
580 if old_is_required && name != options.table_key {
582 comments.push(format!(
583 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
584 options.table_name, name
585 ));
586 }
587
588 if rows.len() > 1 {
589 comments.push(format!(
590 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
591 options.table_name,
592 name,
593 rows[1].trim()
594 ));
595 }
596 }
597
598 let mut unique_new = vec![];
599 let mut index_new = vec![];
600 let mut primary_key = vec![];
601 let (_, index_list) = self.query(
602 format!(
603 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
604 options.table_name
605 )
606 .as_str(),
607 );
608 for item in index_list.members() {
609 let key_name = item["indexname"].as_str().unwrap_or("");
610 let indexdef = item["indexdef"].to_string();
611
612 if indexdef.contains(
613 format!(
614 "CREATE UNIQUE INDEX {}_{} ON",
615 options.table_name, options.table_key
616 )
617 .as_str(),
618 ) {
619 primary_key.push(key_name.to_string());
620 continue;
621 }
622 if indexdef.contains("CREATE UNIQUE INDEX") {
623 unique_new.push(key_name.to_string());
624 continue;
625 }
626 if indexdef.contains("CREATE INDEX") {
627 index_new.push(key_name.to_string());
628 continue;
629 }
630 }
631
632 if !options.table_unique.is_empty() {
633 let full_name = format!(
634 "{}_unique_{}",
635 options.table_name,
636 options.table_unique.join("_")
637 );
638 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
639 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
640 let unique = format!(
641 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
642 name,
643 options.table_name,
644 options.table_unique.join(",")
645 );
646 if !unique_new.contains(&name) {
647 comments.push(unique);
648 }
649 unique_new.retain(|x| *x != name);
650 }
651
652 for row in options.table_index.iter() {
653 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
654 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
655 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
656 let index = format!(
657 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
658 name,
659 options.table_name,
660 row.join(",")
661 );
662 if !index_new.contains(&name) {
663 comments.push(index);
664 }
665 index_new.retain(|x| *x != name);
666 }
667
668 for item in unique_new {
669 if item.ends_with("_pkey") {
670 continue;
671 }
672 if item.starts_with("unique_") {
673 comments.push(format!(
674 "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
675 options.table_name,
676 item.clone()
677 ));
678 } else {
679 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
680 }
681 }
682 for item in index_new {
683 if item.ends_with("_pkey") {
684 continue;
685 }
686 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
687 }
688
689 if self.params.sql {
690 return JsonValue::from(comments.join(""));
691 }
692
693 if comments.is_empty() {
694 return JsonValue::from(-1);
695 }
696
697 for item in comments.iter() {
698 let (state, res) = self.execute(item.as_str());
699 match state {
700 true => {}
701 false => {
702 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
703 return JsonValue::from(0);
704 }
705 }
706 }
707 JsonValue::from(1)
708 }
709
710 fn table_info(&mut self, table: &str) -> JsonValue {
711 let sql = format!(
712 "SELECT COL.COLUMN_NAME,
713 COL.DATA_TYPE,
714 COL.IS_NULLABLE,
715 COL.CHARACTER_MAXIMUM_LENGTH,
716 COL.NUMERIC_PRECISION,
717 COL.NUMERIC_SCALE,
718 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
719 LEFT JOIN
720 pg_catalog.pg_description DESCRIPTION
721 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
722 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
723 let (state, data) = self.query(sql.as_str());
724 let mut list = object! {};
725 if state {
726 for item in data.members() {
727 let mut row = object! {};
728 row["field"] = item["column_name"].clone();
729 row["comment"] = item["comment"].clone();
730 row["type"] = item["data_type"].clone();
731 row["is_nullable"] = item["is_nullable"].clone();
732 row["max_length"] = item["character_maximum_length"].clone();
733 row["numeric_precision"] = item["numeric_precision"].clone();
734 row["numeric_scale"] = item["numeric_scale"].clone();
735 if let Some(field_name) = row["field"].as_str() {
736 list[field_name] = row.clone();
737 }
738 }
739 list
740 } else {
741 list
742 }
743 }
744
745 fn table_is_exist(&mut self, name: &str) -> bool {
746 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
747 let (state, data) = self.query(sql.as_str());
748 match state {
749 true => {
750 for item in data.members() {
751 if item.has_key("exists") {
752 return item["exists"].as_bool().unwrap_or(false);
753 }
754 }
755 false
756 }
757 false => false,
758 }
759 }
760
761 fn table(&mut self, name: &str) -> &mut Pgsql {
762 self.params = Params::default(self.connection.mode.str().as_str());
763 let table_name = format!("{}{}", self.connection.prefix, name);
764 if !super::sql_safety::validate_table_name(&table_name) {
765 error!("Invalid table name: {}", name);
766 }
767 self.params.table = table_name.clone();
768 self.params.join_table = table_name;
769 self
770 }
771
772 fn change_table(&mut self, name: &str) -> &mut Self {
773 self.params.join_table = name.to_string();
774 self
775 }
776
777 fn autoinc(&mut self) -> &mut Self {
778 self.params.autoinc = true;
779 self
780 }
781
782 fn timestamps(&mut self) -> &mut Self {
783 self.params.timestamps = true;
784 self
785 }
786
787 fn fetch_sql(&mut self) -> &mut Self {
788 self.params.sql = true;
789 self
790 }
791
792 fn order(&mut self, field: &str, by: bool) -> &mut Self {
793 self.params.order[field] = {
794 if by {
795 "DESC"
796 } else {
797 "ASC"
798 }
799 }
800 .into();
801 self
802 }
803
804 fn group(&mut self, field: &str) -> &mut Self {
805 let fields: Vec<&str> = field.split(",").collect();
806 for field in fields.iter() {
807 let field = field.to_string();
808 self.params.group[field.as_str()] = field.clone().into();
809 self.params.fields[field.as_str()] = field.clone().into();
810 }
811 self
812 }
813
814 fn distinct(&mut self) -> &mut Self {
815 self.params.distinct = true;
816 self
817 }
818
819 fn json(&mut self, field: &str) -> &mut Self {
820 let list: Vec<&str> = field.split(",").collect();
821 for item in list.iter() {
822 self.params.json[item.to_string().as_str()] = item.to_string().into();
823 }
824 self
825 }
826
827 fn location(&mut self, field: &str) -> &mut Self {
828 let list: Vec<&str> = field.split(",").collect();
829 for item in list.iter() {
830 self.params.location[item.to_string().as_str()] = item.to_string().into();
831 }
832 self
833 }
834
835 fn field(&mut self, field: &str) -> &mut Self {
836 let list: Vec<&str> = field.split(",").collect();
837 let join_table = if self.params.join_table.is_empty() {
838 self.params.table.clone()
839 } else {
840 self.params.join_table.clone()
841 };
842 for item in list.iter() {
843 let lower = item.to_lowercase();
844 let is_expr = lower.contains("count(")
845 || lower.contains("sum(")
846 || lower.contains("avg(")
847 || lower.contains("max(")
848 || lower.contains("min(")
849 || lower.contains("case ");
850 if is_expr {
851 self.params.fields[item.to_string().as_str()] = (*item).into();
852 } else if item.contains(" as ") {
853 let text = item.split(" as ").collect::<Vec<&str>>();
854 self.params.fields[item.to_string().as_str()] =
855 format!("{}.{} as {}", join_table, text[0], text[1]).into();
856 } else {
857 self.params.fields[item.to_string().as_str()] =
858 format!("{join_table}.{item}").into();
859 }
860 }
861 self
862 }
863
864 fn field_raw(&mut self, expr: &str) -> &mut Self {
865 self.params.fields[expr] = expr.into();
866 self
867 }
868
869 fn hidden(&mut self, name: &str) -> &mut Self {
870 let hidden: Vec<&str> = name.split(",").collect();
871
872 let fields_list = self.table_info(self.params.clone().table.as_str());
873 let mut data = array![];
874 for item in fields_list.members() {
875 let _ = data.push(object! {
876 "name":item["field"].as_str().unwrap_or("")
877 });
878 }
879
880 for item in data.members() {
881 let name = item["name"].as_str().unwrap_or("");
882 if !hidden.contains(&name) {
883 self.params.fields[name] = name.into();
884 }
885 }
886 self
887 }
888
889 fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
890 for f in field.split('|') {
891 if !super::sql_safety::validate_field_name(f) {
892 error!("Invalid field name: {}", f);
893 }
894 }
895 if !super::sql_safety::validate_compare_orator(compare) {
896 error!("Invalid compare operator: {}", compare);
897 }
898 let join_table = if self.params.join_table.is_empty() {
899 self.params.table.clone()
900 } else {
901 self.params.join_table.clone()
902 };
903 if value.is_boolean() {
904 let bool_val = value.as_bool().unwrap_or(false);
905 self.params
906 .where_and
907 .push(format!("{join_table}.{field} {compare} {bool_val}"));
908 return self;
909 }
910 match compare {
911 "between" => {
912 self.params.where_and.push(format!(
913 "{}.{} between '{}' AND '{}'",
914 join_table, field, value[0], value[1]
915 ));
916 }
917 "set" => {
918 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
919 let mut wheredata = vec![];
920 for item in list.iter() {
921 wheredata.push(format!(
922 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
923 ));
924 }
925 self.params
926 .where_and
927 .push(format!("({})", wheredata.join(" or ")));
928 }
929 "notin" => {
930 let mut text = String::new();
931 for item in value.members() {
932 text = format!("{text},'{item}'");
933 }
934 text = text.trim_start_matches(",").into();
935 self.params
936 .where_and
937 .push(format!("{join_table}.{field} not in ({text})"));
938 }
939 "is" => {
940 self.params
941 .where_and
942 .push(format!("{join_table}.{field} is {value}"));
943 }
944 "isnot" => {
945 self.params
946 .where_and
947 .push(format!("{join_table}.{field} is not {value}"));
948 }
949 "notlike" => {
950 self.params
951 .where_and
952 .push(format!("{join_table}.{field} not like '{value}'"));
953 }
954 "in" => {
955 if value.is_array() && value.is_empty() {
956 self.params.where_and.push("1=0".to_string());
957 return self;
958 }
959 let mut text = String::new();
960 if value.is_array() {
961 for item in value.members() {
962 text = format!("{text},'{item}'");
963 }
964 } else if value.is_null() {
965 text = format!("{text},null");
966 } else {
967 let value = value.as_str().unwrap_or("");
968
969 let value: Vec<&str> = value.split(",").collect();
970 for item in value.iter() {
971 text = format!("{text},'{item}'");
972 }
973 }
974 text = text.trim_start_matches(",").into();
975
976 self.params
977 .where_and
978 .push(format!("{join_table}.{field} {compare} ({text})"));
979 }
980 "json_contains" => {
984 if value.is_array() {
985 if value.is_empty() {
986 self.params.where_and.push("1=0".to_string());
987 } else {
988 let mut parts = vec![];
989 for item in value.members() {
990 let escaped = super::sql_safety::escape_string(&item.to_string());
991 parts.push(format!(
992 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
993 escaped
994 ));
995 }
996 self.params
997 .where_and
998 .push(format!("({})", parts.join(" OR ")));
999 }
1000 } else {
1001 let escaped = super::sql_safety::escape_string(&value.to_string());
1002 self.params.where_and.push(format!(
1003 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1004 escaped
1005 ));
1006 }
1007 }
1008 _ => {
1009 self.params
1010 .where_and
1011 .push(format!("{join_table}.{field} {compare} '{value}'"));
1012 }
1013 }
1014 self
1015 }
1016
1017 fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1018 for f in field.split('|') {
1019 if !super::sql_safety::validate_field_name(f) {
1020 error!("Invalid field name: {}", f);
1021 }
1022 }
1023 if !super::sql_safety::validate_compare_orator(compare) {
1024 error!("Invalid compare operator: {}", compare);
1025 }
1026 let join_table = if self.params.join_table.is_empty() {
1027 self.params.table.clone()
1028 } else {
1029 self.params.join_table.clone()
1030 };
1031
1032 if value.is_boolean() {
1033 let bool_val = value.as_bool().unwrap_or(false);
1034 self.params
1035 .where_or
1036 .push(format!("{join_table}.{field} {compare} {bool_val}"));
1037 return self;
1038 }
1039
1040 match compare {
1041 "between" => {
1042 self.params.where_or.push(format!(
1043 "{}.{} between '{}' AND '{}'",
1044 join_table, field, value[0], value[1]
1045 ));
1046 }
1047 "set" => {
1048 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1049 let mut wheredata = vec![];
1050 for item in list.iter() {
1051 wheredata.push(format!(
1052 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1053 ));
1054 }
1055 self.params
1056 .where_or
1057 .push(format!("({})", wheredata.join(" or ")));
1058 }
1059 "notin" => {
1060 let mut text = String::new();
1061 for item in value.members() {
1062 text = format!("{text},'{item}'");
1063 }
1064 text = text.trim_start_matches(",").into();
1065 self.params
1066 .where_or
1067 .push(format!("{join_table}.{field} not in ({text})"));
1068 }
1069 "is" => {
1070 self.params
1071 .where_or
1072 .push(format!("{join_table}.{field} is {value}"));
1073 }
1074 "isnot" => {
1075 self.params
1076 .where_or
1077 .push(format!("{join_table}.{field} is not {value}"));
1078 }
1079 "in" => {
1080 if value.is_array() && value.is_empty() {
1081 self.params.where_or.push("1=0".to_string());
1082 return self;
1083 }
1084 let mut text = String::new();
1085 if value.is_array() {
1086 for item in value.members() {
1087 text = format!("{text},'{item}'");
1088 }
1089 } else {
1090 let value = value.as_str().unwrap_or("");
1091 let value: Vec<&str> = value.split(",").collect();
1092 for item in value.iter() {
1093 text = format!("{text},'{item}'");
1094 }
1095 }
1096 text = text.trim_start_matches(",").into();
1097 self.params
1098 .where_or
1099 .push(format!("{join_table}.{field} {compare} ({text})"));
1100 }
1101 "json_contains" => {
1105 if value.is_array() {
1106 if value.is_empty() {
1107 self.params.where_or.push("1=0".to_string());
1108 } else {
1109 let mut parts = vec![];
1110 for item in value.members() {
1111 let escaped = super::sql_safety::escape_string(&item.to_string());
1112 parts.push(format!(
1113 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1114 escaped
1115 ));
1116 }
1117 self.params
1118 .where_or
1119 .push(format!("({})", parts.join(" OR ")));
1120 }
1121 } else {
1122 let escaped = super::sql_safety::escape_string(&value.to_string());
1123 self.params.where_or.push(format!(
1124 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1125 escaped
1126 ));
1127 }
1128 }
1129 _ => {
1130 self.params
1131 .where_or
1132 .push(format!("{join_table}.{field} {compare} '{value}'"));
1133 }
1134 }
1135 self
1136 }
1137
1138 fn where_raw(&mut self, expr: &str) -> &mut Self {
1139 self.params.where_and.push(expr.to_string());
1140 self
1141 }
1142
1143 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1144 self.params
1145 .where_and
1146 .push(format!("\"{field}\" IN ({sub_sql})"));
1147 self
1148 }
1149
1150 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1151 self.params
1152 .where_and
1153 .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1154 self
1155 }
1156
1157 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1158 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1159 self
1160 }
1161
1162 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1163 self.params
1164 .where_and
1165 .push(format!("NOT EXISTS ({sub_sql})"));
1166 self
1167 }
1168
1169 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1170 self.params.where_column = format!(
1171 "{}.{} {} {}.{}",
1172 self.params.table, field_a, compare, self.params.table, field_b
1173 );
1174 self
1175 }
1176
1177 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1178 self.params
1179 .update_column
1180 .push(format!("{field_a} = {compare}"));
1181 self
1182 }
1183
1184 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1185 self.params.page = page;
1186 self.params.limit = limit;
1187 self
1188 }
1189
1190 fn limit(&mut self, count: i32) -> &mut Self {
1191 self.params.limit_only = count;
1192 self
1193 }
1194
1195 fn column(&mut self, field: &str) -> JsonValue {
1196 self.field(field);
1197 let sql = self.params.select_sql();
1198
1199 if self.params.sql {
1200 return JsonValue::from(sql);
1201 }
1202 let (state, data) = self.query(sql.as_str());
1203 match state {
1204 true => {
1205 let mut list = array![];
1206 for item in data.members() {
1207 if self.params.json[field].is_empty() {
1208 let _ = list.push(item[field].clone());
1209 } else {
1210 let data =
1211 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1212 let _ = list.push(data);
1213 }
1214 }
1215 list
1216 }
1217 false => {
1218 array![]
1219 }
1220 }
1221 }
1222
1223 fn count(&mut self) -> JsonValue {
1224 self.params.fields = json::object! {};
1225 self.params.fields["count"] = "count(*) as count".to_string().into();
1226 let sql = self.params.select_sql();
1227 if self.params.sql {
1228 return JsonValue::from(sql.clone());
1229 }
1230 let (state, data) = self.query(sql.as_str());
1231 if state {
1232 data[0]["count"].clone()
1233 } else {
1234 JsonValue::from(0)
1235 }
1236 }
1237
1238 fn max(&mut self, field: &str) -> JsonValue {
1239 self.params.fields[field] = format!("max({field}) as {field}").into();
1240 let sql = self.params.select_sql();
1241 if self.params.sql {
1242 return JsonValue::from(sql.clone());
1243 }
1244 let (state, data) = self.query(sql.as_str());
1245 if state {
1246 if data.len() > 1 {
1247 return data.clone();
1248 }
1249 data[0][field].clone()
1250 } else {
1251 JsonValue::from(0)
1252 }
1253 }
1254
1255 fn min(&mut self, field: &str) -> JsonValue {
1256 self.params.fields[field] = format!("min({field}) as {field}").into();
1257 let sql = self.params.select_sql();
1258 if self.params.sql {
1259 return JsonValue::from(sql.clone());
1260 }
1261 let (state, data) = self.query(sql.as_str());
1262 if state {
1263 if data.len() > 1 {
1264 return data;
1265 }
1266 data[0][field].clone()
1267 } else {
1268 JsonValue::from(0)
1269 }
1270 }
1271
1272 fn sum(&mut self, field: &str) -> JsonValue {
1273 self.params.fields[field] = format!("sum({field}) as {field}").into();
1274 let sql = self.params.select_sql();
1275 if self.params.sql {
1276 return JsonValue::from(sql.clone());
1277 }
1278 let (state, data) = self.query(sql.as_str());
1279 match state {
1280 true => {
1281 if data.len() > 1 {
1282 return data;
1283 }
1284 data[0][field].clone()
1285 }
1286 false => JsonValue::from(0),
1287 }
1288 }
1289
1290 fn avg(&mut self, field: &str) -> JsonValue {
1291 self.params.fields[field] = format!("avg({field}) as {field}").into();
1292 let sql = self.params.select_sql();
1293 if self.params.sql {
1294 return JsonValue::from(sql.clone());
1295 }
1296 let (state, data) = self.query(sql.as_str());
1297 if state {
1298 if data.len() > 1 {
1299 return data;
1300 }
1301 data[0][field].clone()
1302 } else {
1303 JsonValue::from(0)
1304 }
1305 }
1306
1307 fn having(&mut self, expr: &str) -> &mut Self {
1308 self.params.having.push(expr.to_string());
1309 self
1310 }
1311
1312 fn select(&mut self) -> JsonValue {
1313 let sql = self.params.select_sql();
1314 if self.params.sql {
1315 return JsonValue::from(sql.clone());
1316 }
1317 let (state, mut data) = self.query(sql.as_str());
1318 match state {
1319 true => {
1320 for (field, _) in self.params.json.entries() {
1321 for item in data.members_mut() {
1322 if !item[field].is_empty() {
1323 let json = item[field].to_string();
1324 item[field] = match json::parse(&json) {
1325 Ok(e) => e,
1326 Err(_) => JsonValue::from(json),
1327 };
1328 }
1329 }
1330 }
1331 data.clone()
1332 }
1333 false => array![],
1334 }
1335 }
1336
1337 fn find(&mut self) -> JsonValue {
1338 self.params.page = 1;
1339 self.params.limit = 1;
1340 let sql = self.params.select_sql();
1341 if self.params.sql {
1342 return JsonValue::from(sql.clone());
1343 }
1344 let (state, mut data) = self.query(sql.as_str());
1345 match state {
1346 true => {
1347 if data.is_empty() {
1348 return object! {};
1349 }
1350 for (field, _) in self.params.json.entries() {
1351 if !data[0][field].is_empty() {
1352 let json = data[0][field].to_string();
1353 let json = json::parse(&json).unwrap_or(array![]);
1354 data[0][field] = json;
1355 } else {
1356 data[0][field] = array![];
1357 }
1358 }
1359 data[0].clone()
1360 }
1361 false => {
1362 object! {}
1363 }
1364 }
1365 }
1366
1367 fn value(&mut self, field: &str) -> JsonValue {
1368 self.params.fields = object! {};
1369 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1370 self.params.page = 1;
1371 self.params.limit = 1;
1372 let sql = self.params.select_sql();
1373 if self.params.sql {
1374 return JsonValue::from(sql.clone());
1375 }
1376 let (state, mut data) = self.query(sql.as_str());
1377 match state {
1378 true => {
1379 for (field, _) in self.params.json.entries() {
1380 if !data[0][field].is_empty() {
1381 let json = data[0][field].to_string();
1382 let json = json::parse(&json).unwrap_or(array![]);
1383 data[0][field] = json;
1384 } else {
1385 data[0][field] = array![];
1386 }
1387 }
1388 data[0][field].clone()
1389 }
1390 false => {
1391 if self.connection.debug {
1392 info!("{data:?}");
1393 }
1394 JsonValue::Null
1395 }
1396 }
1397 }
1398
1399 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1400 let fields_list = self.table_info(&self.params.table.clone());
1401 let mut fields = vec![];
1402 let mut values = vec![];
1403 if !self.params.autoinc && data["id"].is_empty() {
1404 let thread_id = format!("{:?}", std::thread::current().id());
1405 let thread_num: u64 = thread_id
1406 .trim_start_matches("ThreadId(")
1407 .trim_end_matches(")")
1408 .parse()
1409 .unwrap_or(0);
1410 data["id"] = format!(
1411 "{:X}{:X}",
1412 Local::now().timestamp_nanos_opt().unwrap_or(0),
1413 thread_num
1414 )
1415 .into();
1416 }
1417 for (field, value) in data.entries() {
1418 fields.push(format!("\"{}\"", field));
1419
1420 if value.is_string() {
1421 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1422 continue;
1423 } else if value.is_array() {
1424 if self.params.json[field].is_empty() {
1425 let array = value
1426 .members()
1427 .map(|x| x.as_str().unwrap_or(""))
1428 .collect::<Vec<&str>>()
1429 .join(",");
1430 values.push(format!("'{array}'"));
1431 } else {
1432 let json = value.to_string();
1433 let json = json.replace("'", "''");
1434 values.push(format!("'{json}'"));
1435 }
1436 continue;
1437 } else if value.is_object() {
1438 if self.params.json[field].is_empty() {
1439 values.push(format!("'{value}'"));
1440 } else {
1441 let json = value.to_string();
1442 let json = json.replace("'", "''");
1443 values.push(format!("'{json}'"));
1444 }
1445 continue;
1446 } else if value.is_number() {
1447 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1448 if col_type == "boolean" {
1449 let bool_val = value.as_i64().unwrap_or(0) != 0;
1450 values.push(format!("{bool_val}"));
1451 } else if col_type.contains("int") {
1452 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1453 } else {
1454 values.push(format!("{value}"));
1455 }
1456 continue;
1457 } else if value.is_boolean() || value.is_null() {
1458 values.push(format!("{value}"));
1459 continue;
1460 } else {
1461 values.push(format!("'{value}'"));
1462 continue;
1463 }
1464 }
1465 let fields = fields.join(",");
1466 let values = values.join(",");
1467
1468 let sql = format!(
1469 "INSERT INTO {} ({}) VALUES ({});",
1470 self.params.table, fields, values
1471 );
1472 if self.params.sql {
1473 return JsonValue::from(sql.clone());
1474 }
1475 let (state, ids) = self.execute(sql.as_str());
1476
1477 match state {
1478 true => match self.params.autoinc {
1479 true => ids.clone(),
1480 false => data["id"].clone(),
1481 },
1482 false => {
1483 let thread_id = format!("{:?}", thread::current().id());
1484 error!("添加失败: {thread_id} {ids:?} {sql}");
1485 JsonValue::from("")
1486 }
1487 }
1488 }
1489 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1490 let fields_list = self.table_info(&self.params.table.clone());
1491 let mut fields = String::new();
1492 if !self.params.autoinc && data[0]["id"].is_empty() {
1493 data[0]["id"] = "".into();
1494 }
1495 for (field, _) in data[0].entries() {
1496 fields = format!("{fields},\"{field}\"");
1497 }
1498 fields = fields.trim_start_matches(",").to_string();
1499
1500 let core_count = num_cpus::get();
1501 let mut p = pools::Pool::new(core_count * 4);
1502
1503 let autoinc = self.params.autoinc;
1504 for list in data.members() {
1505 let mut item = list.clone();
1506 let i = br_fields::str::Code::verification_code(3);
1507 let fields_list_new = fields_list.clone();
1508 p.execute(move |pcindex| {
1509 if !autoinc && item["id"].is_empty() {
1510 let id = format!(
1511 "{:X}{:X}{}",
1512 Local::now().timestamp_nanos_opt().unwrap_or(0),
1513 pcindex,
1514 i
1515 );
1516 item["id"] = id.into();
1517 }
1518 let mut values = "".to_string();
1519 for (field, value) in item.entries() {
1520 if value.is_string() {
1521 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1522 } else if value.is_number() {
1523 let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1524 if col_type == "boolean" {
1525 let bool_val = value.as_i64().unwrap_or(0) != 0;
1526 values = format!("{values},{bool_val}");
1527 } else if col_type.contains("int") {
1528 values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1529 } else {
1530 values = format!("{values},{value}");
1531 }
1532 } else if value.is_boolean() {
1533 values = format!("{values},{value}");
1534 continue;
1535 } else {
1536 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1537 }
1538 }
1539 values = format!("({})", values.trim_start_matches(","));
1540 array![item["id"].clone(), values]
1541 });
1542 }
1543 let (ids_list, mut values) = p.insert_all();
1544 values = values.trim_start_matches(",").to_string();
1545 let sql = format!(
1546 "INSERT INTO {} ({}) VALUES {};",
1547 self.params.table, fields, values
1548 );
1549
1550 if self.params.sql {
1551 return JsonValue::from(sql.clone());
1552 }
1553 let (state, data) = self.execute(sql.as_str());
1554 match state {
1555 true => match autoinc {
1556 true => data,
1557 false => JsonValue::from(ids_list),
1558 },
1559 false => {
1560 error!("insert_all: {data:?}");
1561 let _ = std::fs::write("pgsql_insert_all_error.log", format!("insert_all error:\n{:?}\n\nSQL:\n{}", data, sql));
1562 array![]
1563 }
1564 }
1565 }
1566 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1567 let fields_list = self.table_info(&self.params.table.clone());
1568 let mut fields = vec![];
1569 let mut values = vec![];
1570 if !self.params.autoinc && data["id"].is_empty() {
1571 let thread_id = format!("{:?}", std::thread::current().id());
1572 let thread_num: u64 = thread_id
1573 .trim_start_matches("ThreadId(")
1574 .trim_end_matches(")")
1575 .parse()
1576 .unwrap_or(0);
1577 data["id"] = format!(
1578 "{:X}{:X}",
1579 Local::now().timestamp_nanos_opt().unwrap_or(0),
1580 thread_num
1581 )
1582 .into();
1583 }
1584 for (field, value) in data.entries() {
1585 fields.push(format!("\"{}\"", field));
1586
1587 if value.is_string() {
1588 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1589 continue;
1590 } else if value.is_array() {
1591 if self.params.json[field].is_empty() {
1592 let array = value
1593 .members()
1594 .map(|x| x.as_str().unwrap_or(""))
1595 .collect::<Vec<&str>>()
1596 .join(",");
1597 values.push(format!("'{array}'"));
1598 } else {
1599 let json = value.to_string();
1600 let json = json.replace("'", "''");
1601 values.push(format!("'{json}'"));
1602 }
1603 continue;
1604 } else if value.is_object() {
1605 if self.params.json[field].is_empty() {
1606 values.push(format!("'{value}'"));
1607 } else {
1608 let json = value.to_string();
1609 let json = json.replace("'", "''");
1610 values.push(format!("'{json}'"));
1611 }
1612 continue;
1613 } else if value.is_number() {
1614 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1615 if col_type == "boolean" {
1616 let bool_val = value.as_i64().unwrap_or(0) != 0;
1617 values.push(format!("{bool_val}"));
1618 } else if col_type.contains("int") {
1619 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1620 } else {
1621 values.push(format!("{value}"));
1622 }
1623 continue;
1624 } else if value.is_boolean() || value.is_null() {
1625 values.push(format!("{value}"));
1626 continue;
1627 } else {
1628 values.push(format!("'{value}'"));
1629 continue;
1630 }
1631 }
1632
1633 let conflict_cols: Vec<String> = conflict_fields
1634 .iter()
1635 .map(|f| format!("\"{}\"", f))
1636 .collect();
1637
1638 let update_set: Vec<String> = fields
1639 .iter()
1640 .filter(|f| {
1641 let name = f.trim_matches('"');
1642 !conflict_fields.contains(&name) && name != "id"
1643 })
1644 .map(|f| format!("{f}=EXCLUDED.{f}"))
1645 .collect();
1646
1647 let fields_str = fields.join(",");
1648 let values_str = values.join(",");
1649
1650 let sql = format!(
1651 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1652 self.params.table,
1653 fields_str,
1654 values_str,
1655 conflict_cols.join(","),
1656 update_set.join(",")
1657 );
1658 if self.params.sql {
1659 return JsonValue::from(sql.clone());
1660 }
1661 let (state, result) = self.execute(sql.as_str());
1662 match state {
1663 true => match self.params.autoinc {
1664 true => result.clone(),
1665 false => data["id"].clone(),
1666 },
1667 false => {
1668 let thread_id = format!("{:?}", thread::current().id());
1669 error!("upsert失败: {thread_id} {result:?} {sql}");
1670 JsonValue::from("")
1671 }
1672 }
1673 }
1674 fn update(&mut self, data: JsonValue) -> JsonValue {
1675 let fields_list = self.table_info(&self.params.table.clone());
1676 let mut values = vec![];
1677 for (field, value) in data.entries() {
1678 if value.is_string() {
1679 values.push(format!(
1680 "\"{}\"='{}'",
1681 field,
1682 value.to_string().replace("'", "''")
1683 ));
1684 } else if value.is_number() {
1685 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1686 if col_type == "boolean" {
1687 let bool_val = value.as_i64().unwrap_or(0) != 0;
1688 values.push(format!("\"{field}\"= {bool_val}"));
1689 } else if col_type.contains("int") {
1690 values.push(format!(
1691 "\"{}\"= {}",
1692 field,
1693 value.as_f64().unwrap_or(0.0) as i64
1694 ));
1695 } else {
1696 values.push(format!("\"{field}\"= {value}"));
1697 }
1698 } else if value.is_array() {
1699 if self.params.json[field].is_empty() {
1700 let array = value
1701 .members()
1702 .map(|x| x.as_str().unwrap_or(""))
1703 .collect::<Vec<&str>>()
1704 .join(",");
1705 values.push(format!("\"{field}\"='{array}'"));
1706 } else {
1707 let json = value.to_string();
1708 let json = json.replace("'", "''");
1709 values.push(format!("\"{field}\"='{json}'"));
1710 }
1711 continue;
1712 } else if value.is_object() {
1713 if self.params.json[field].is_empty() {
1714 values.push(format!("\"{field}\"='{value}'"));
1715 } else {
1716 if value.is_empty() {
1717 values.push(format!("\"{field}\"=''"));
1718 continue;
1719 }
1720 let json = value.to_string();
1721 let json = json.replace("'", "''");
1722 values.push(format!("\"{field}\"='{json}'"));
1723 }
1724 continue;
1725 } else if value.is_boolean() {
1726 values.push(format!("\"{field}\"= {value}"));
1727 } else {
1728 values.push(format!("\"{field}\"=\"{value}\""));
1729 }
1730 }
1731
1732 for (field, value) in self.params.inc_dec.entries() {
1733 values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1734 }
1735 if !self.params.update_column.is_empty() {
1736 values.extend(self.params.update_column.clone());
1737 }
1738 let values = values.join(",");
1739
1740 let sql = format!(
1741 "UPDATE {} SET {} {};",
1742 self.params.table.clone(),
1743 values,
1744 self.params.where_sql()
1745 );
1746 if self.params.sql {
1747 return JsonValue::from(sql.clone());
1748 }
1749 let (state, data) = self.execute(sql.as_str());
1750 if state {
1751 data
1752 } else {
1753 let thread_id = format!("{:?}", thread::current().id());
1754 error!("update: {thread_id} {data:?} {sql}");
1755 0.into()
1756 }
1757 }
1758 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1759 let fields_list = self.table_info(&self.params.table.clone());
1760 let mut values = vec![];
1761
1762 let mut ids = vec![];
1763 for (field, _) in data[0].entries() {
1764 if field == "id" {
1765 continue;
1766 }
1767 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1768 let mut fields = vec![];
1769 for row in data.members() {
1770 let value = row[field].clone();
1771 let id = row["id"].clone();
1772 ids.push(id.clone());
1773 if value.is_string() {
1774 fields.push(format!(
1775 "WHEN '{}' THEN '{}'",
1776 id,
1777 value.to_string().replace("'", "''")
1778 ));
1779 } else if value.is_array() || value.is_object() {
1780 if self.params.json[field].is_empty() {
1781 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1782 } else {
1783 let json = value.to_string();
1784 let json = json.replace("'", "''");
1785 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1786 }
1787 continue;
1788 } else if value.is_number() {
1789 if col_type == "boolean" {
1790 let bool_val = value.as_i64().unwrap_or(0) != 0;
1791 fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1792 } else {
1793 fields.push(format!("WHEN '{id}' THEN {value}"));
1794 }
1795 } else if value.is_boolean() || value.is_null() {
1796 fields.push(format!("WHEN '{id}' THEN {value}"));
1797 } else {
1798 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1799 }
1800 }
1801 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1802 }
1803 self.where_and("id", "in", ids.into());
1804 for (field, value) in self.params.inc_dec.entries() {
1805 values.push(format!("{} = {}", field, value.to_string().clone()));
1806 }
1807
1808 let values = values.join(",");
1809 let sql = format!(
1810 "UPDATE {} SET {} {} {};",
1811 self.params.table.clone(),
1812 values,
1813 self.params.where_sql(),
1814 self.params.page_limit_sql()
1815 );
1816 if self.params.sql {
1817 return JsonValue::from(sql.clone());
1818 }
1819 let (state, data) = self.execute(sql.as_str());
1820 if state {
1821 data
1822 } else {
1823 error!("update_all: {data:?}");
1824 JsonValue::from(0)
1825 }
1826 }
1827
1828 fn delete(&mut self) -> JsonValue {
1829 let sql = format!(
1830 "delete FROM {} {} {};",
1831 self.params.table.clone(),
1832 self.params.where_sql(),
1833 self.params.page_limit_sql()
1834 );
1835 if self.params.sql {
1836 return JsonValue::from(sql.clone());
1837 }
1838 let (state, data) = self.execute(sql.as_str());
1839 match state {
1840 true => data,
1841 false => {
1842 error!("delete 失败>>> {data:?}");
1843 JsonValue::from(0)
1844 }
1845 }
1846 }
1847
1848 fn transaction(&mut self) -> bool {
1849 let thread_id = format!("{:?}", thread::current().id());
1850 let key = format!("{}{}", self.default, thread_id);
1851
1852 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1853 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1854 PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1855 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1856 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1857 return true;
1858 }
1859
1860 let mut conn = None;
1862 for attempt in 0..3u8 {
1863 match self.client.get_connect_for_transaction() {
1864 Ok(mut c) => {
1865 if c.is_valid() {
1866 conn = Some(c);
1867 break;
1868 }
1869 warn!("事务连接无效(第{}次)", attempt + 1);
1870 self.client.release_transaction_conn();
1871 }
1872 Err(e) => {
1873 warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1874 }
1875 }
1876 if attempt < 2 {
1877 thread::sleep(std::time::Duration::from_millis(200));
1878 }
1879 }
1880 let mut conn = match conn {
1881 Some(c) => c,
1882 None => {
1883 error!("获取事务连接重试耗尽");
1884 return false;
1885 }
1886 };
1887
1888 if let Err(e) = conn.execute("START TRANSACTION") {
1889 error!("启动事务失败: {e}");
1890 self.client.release_transaction_conn();
1891 return false;
1892 }
1893
1894
1895 PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1896 true
1897 }
1898 fn commit(&mut self) -> bool {
1899 let thread_id = format!("{:?}", thread::current().id());
1900 let key = format!("{}{}", self.default, thread_id);
1901
1902 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1903 error!("commit: 没有活跃的事务");
1904 return false;
1905 }
1906
1907 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1908 if depth > 1 {
1909 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1910 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1911 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1912 return true;
1913 }
1914
1915 let commit_result =
1916 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1917
1918 let success = match commit_result {
1919 Some(Ok(_)) => true,
1920 Some(Err(e)) => {
1921 error!("提交事务失败: {e}");
1922 false
1923 }
1924 None => {
1925 error!("提交事务失败: 未找到连接");
1926 false
1927 }
1928 };
1929
1930 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1931 self.client.release_transaction_conn();
1932 success
1933 }
1934
1935 fn rollback(&mut self) -> bool {
1936 let thread_id = format!("{:?}", thread::current().id());
1937 let key = format!("{}{}", self.default, thread_id);
1938
1939 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1940 error!("rollback: 没有活跃的事务");
1941 return false;
1942 }
1943
1944 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1945 if depth > 1 {
1946 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1947 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1948 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1949 return true;
1950 }
1951
1952 let rollback_result =
1953 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1954
1955 let success = match rollback_result {
1956 Some(Ok(_)) => true,
1957 Some(Err(e)) => {
1958 error!("回滚失败: {e}");
1959 false
1960 }
1961 None => {
1962 error!("回滚失败: 未找到连接");
1963 false
1964 }
1965 };
1966
1967 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1968 self.client.release_transaction_conn();
1969 success
1970 }
1971
1972 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1973 let (state, data) = self.query(sql);
1974 match state {
1975 true => Ok(data),
1976 false => Err(data.to_string()),
1977 }
1978 }
1979
1980 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1981 let (state, data) = self.execute(sql);
1982 match state {
1983 true => Ok(data),
1984 false => Err(data.to_string()),
1985 }
1986 }
1987
1988 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1989 self.params.inc_dec[field] = format!("{field} + {num}").into();
1990 self
1991 }
1992
1993 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1994 self.params.inc_dec[field] = format!("{field} - {num}").into();
1995 self
1996 }
1997 fn buildsql(&mut self) -> String {
1998 self.fetch_sql();
1999 let sql = self.select().to_string();
2000 format!("( {} ) {}", sql, self.params.table)
2001 }
2002
2003 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2004 for field in fields {
2005 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2006 }
2007 self
2008 }
2009
2010 fn join(
2011 &mut self,
2012 main_table: &str,
2013 main_fields: &str,
2014 right_table: &str,
2015 right_fields: &str,
2016 ) -> &mut Self {
2017 let main_table = if main_table.is_empty() {
2018 self.params.table.clone()
2019 } else {
2020 main_table.to_string()
2021 };
2022 self.params.join_table = right_table.to_string();
2023 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2024 self
2025 }
2026
2027 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2028 let main_fields = if main_fields.is_empty() {
2029 "id"
2030 } else {
2031 main_fields
2032 };
2033 let second_fields = if second_fields.is_empty() {
2034 self.params.table.clone()
2035 } else {
2036 second_fields.to_string().clone()
2037 };
2038 let sec_table_name = format!("{}{}", table, "_2");
2039 let second_table = format!("{} {}", table, sec_table_name.clone());
2040 self.params.join_table = sec_table_name.clone();
2041 self.params.join.push(format!(
2042 " INNER JOIN {} ON {}.{} = {}.{}",
2043 second_table, self.params.table, main_fields, sec_table_name, second_fields
2044 ));
2045 self
2046 }
2047
2048 fn join_right(
2049 &mut self,
2050 main_table: &str,
2051 main_fields: &str,
2052 right_table: &str,
2053 right_fields: &str,
2054 ) -> &mut Self {
2055 let main_table = if main_table.is_empty() {
2056 self.params.table.clone()
2057 } else {
2058 main_table.to_string()
2059 };
2060 self.params.join_table = right_table.to_string();
2061 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2062 self
2063 }
2064
2065 fn join_full(
2066 &mut self,
2067 main_table: &str,
2068 main_fields: &str,
2069 right_table: &str,
2070 right_fields: &str,
2071 ) -> &mut Self {
2072 let main_table = if main_table.is_empty() {
2073 self.params.table.clone()
2074 } else {
2075 main_table.to_string()
2076 };
2077 self.params.join_table = right_table.to_string();
2078 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2079 self
2080 }
2081
2082 fn union(&mut self, sub_sql: &str) -> &mut Self {
2083 self.params.unions.push(format!("UNION {sub_sql}"));
2084 self
2085 }
2086
2087 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2088 self.params.unions.push(format!("UNION ALL {sub_sql}"));
2089 self
2090 }
2091
2092 fn lock_for_update(&mut self) -> &mut Self {
2093 self.params.lock_mode = "FOR UPDATE".to_string();
2094 self
2095 }
2096
2097 fn lock_for_share(&mut self) -> &mut Self {
2098 self.params.lock_mode = "FOR SHARE".to_string();
2099 self
2100 }
2101}