1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use br_pgsql::PgsqlError;
7use chrono::Local;
8use json::{array, object, JsonValue};
9use log::{error, info, warn};
10use std::thread;
11#[derive(Clone)]
12pub struct Pgsql {
13 pub connection: Connection,
15 pub default: String,
17 pub params: Params,
18 pub client: Pools,
19}
20
21impl Pgsql {
22 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
23 let port = connection
24 .hostport
25 .parse::<i32>()
26 .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
27
28 let cp_connection = connection.clone();
29 let config = object! {
30 debug: cp_connection.debug,
31 username: cp_connection.username,
32 userpass: cp_connection.userpass,
33 database: cp_connection.database,
34 hostname: cp_connection.hostname,
35 hostport: port,
36 charset: cp_connection.charset.str(),
37 };
38 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
39
40 let pools = pgsql.pools()?;
41 Ok(Self {
42 connection,
43 default: default.clone(),
44 params: Params::default("pgsql"),
45 client: pools,
46 })
47 }
48
49 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
50 let thread_id = format!("{:?}", thread::current().id());
51 let key = format!("{}{}", self.default, thread_id);
52
53 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
54 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
55
56 match result {
57 Some(Ok(e)) => {
58 if self.connection.debug {
59 info!("查询成功: {} {}", thread_id.clone(), sql);
60 }
61 (true, e.rows)
62 }
63 Some(Err(e)) => {
64 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
65 (false, JsonValue::from(e.to_string()))
66 }
67 None => {
68 error!("事务查询失败: 未找到事务连接 {thread_id}");
69 (false, JsonValue::from("未找到事务连接"))
70 }
71 }
72 } else {
73 let mut guard = match self.client.get_guard() {
74 Ok(g) => g,
75 Err(e) => {
76 error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
78 return (false, JsonValue::from(e.to_string()));
79 }
80 };
81 match guard.conn().query(sql) {
82 Ok(e) => {
83 if self.connection.debug {
84 info!("查询成功: {} {}", thread_id.clone(), sql);
85 }
86 (true, e.rows)
87 }
88 Err(ref e) if Self::is_retriable_error(e) => {
89 guard.discard();
92 self.client.flush_idle();
94 warn!("非事务查询连接断开(重试一次): {thread_id} {e}");
95 thread::sleep(std::time::Duration::from_millis(200));
96 let mut guard2 = match self.client.get_guard() {
97 Ok(g) => g,
98 Err(e) => {
99 error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
100 return (false, JsonValue::from(e.to_string()));
101 }
102 };
103 match guard2.conn().query(sql) {
104 Ok(e) => {
105 if self.connection.debug {
106 info!("查询成功(重试): {} {}", thread_id.clone(), sql);
107 }
108 (true, e.rows)
109 }
110 Err(e) => {
111 error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
112 (false, JsonValue::from(e.to_string()))
113 }
114 }
115 }
116 Err(e) => {
117 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
118 (false, JsonValue::from(e.to_string()))
119 }
120 }
121 }
122 }
123 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
124 let thread_id = format!("{:?}", thread::current().id());
125 let key = format!("{}{}", self.default, thread_id);
126
127 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
128 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
129
130 match result {
131 Some(Ok(e)) => {
132 if self.connection.debug {
133 info!("提交成功: {} {}", thread_id.clone(), sql);
134 }
135 if sql.contains("INSERT") {
136 (true, e.rows)
137 } else {
138 (true, e.affect_count.into())
139 }
140 }
141 Some(Err(e)) => {
142 error!("事务提交失败: {thread_id} {e}");
143 (false, JsonValue::from(e.to_string()))
144 }
145 None => {
146 error!("事务执行失败: 未找到事务连接 {thread_id}");
147 (false, JsonValue::from("未找到事务连接"))
148 }
149 }
150 } else {
151 let mut guard = match self.client.get_guard() {
153 Ok(g) => g,
154 Err(e) => {
155 error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
156 return (false, JsonValue::from(e.to_string()));
157 }
158 };
159 match guard.conn().execute(sql) {
160 Ok(e) => {
161 if self.connection.debug {
162 info!("提交成功: {} {}", thread_id.clone(), sql);
163 }
164 if sql.contains("INSERT") {
165 (true, e.rows)
166 } else {
167 (true, e.affect_count.into())
168 }
169 }
170 Err(ref e) if Self::is_retriable_error(e) => {
171 guard.discard();
174 self.client.flush_idle();
176 warn!("非事务执行连接断开(重试一次): {thread_id} {e}");
177 thread::sleep(std::time::Duration::from_millis(200));
178 let mut guard2 = match self.client.get_guard() {
179 Ok(g) => g,
180 Err(e) => {
181 error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
182 return (false, JsonValue::from(e.to_string()));
183 }
184 };
185 match guard2.conn().execute(sql) {
186 Ok(e) => {
187 if self.connection.debug {
188 info!("提交成功(重试): {} {}", thread_id.clone(), sql);
189 }
190 if sql.contains("INSERT") {
191 (true, e.rows)
192 } else {
193 (true, e.affect_count.into())
194 }
195 }
196 Err(e) => {
197 error!("非事务执行重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
198 (false, JsonValue::from(e.to_string()))
199 }
200 }
201 }
202 Err(e) => {
203 error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
204 (false, JsonValue::from(e.to_string()))
205 }
206 }
207 }
208 }
209
210 fn is_retriable_error(e: &PgsqlError) -> bool {
212 matches!(
213 e,
214 PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
215 )
216 }
217}
218
219impl DbMode for Pgsql {
220 fn database_tables(&mut self) -> JsonValue {
221 let sql = "SHOW TABLES".to_string();
222 match self.sql(sql.as_str()) {
223 Ok(e) => {
224 let mut list = vec![];
225 for item in e.members() {
226 for (_, value) in item.entries() {
227 list.push(value.clone());
228 }
229 }
230 list.into()
231 }
232 Err(_) => {
233 array![]
234 }
235 }
236 }
237
238 fn database_create(&mut self, name: &str) -> bool {
239 let sql = format!("CREATE DATABASE {name}");
240
241 let (state, data) = self.execute(sql.as_str());
242 match state {
243 true => data.as_bool().unwrap_or(true),
244 false => {
245 error!("创建数据库失败: {data:?}");
246 false
247 }
248 }
249 }
250
251 fn truncate(&mut self, table: &str) -> bool {
252 let sql = format!("TRUNCATE TABLE {table}");
253 let (state, _) = self.execute(sql.as_str());
254 state
255 }
256}
257
258impl Mode for Pgsql {
259 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
260 let mut sql = String::new();
261 let mut comments = vec![];
262
263 if !options.table_unique.is_empty() {
264 let full_name = format!(
265 "{}_unique_{}",
266 options.table_name,
267 options.table_unique.join("_")
268 );
269 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
270 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
271 let unique = format!(
272 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
273 name,
274 options.table_name,
275 options.table_unique.join(",")
276 );
277 comments.push(unique);
278 }
279
280 for row in options.table_index.iter() {
281 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
282 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
283 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
284 let index = format!(
285 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
286 name,
287 options.table_name,
288 row.join(",")
289 );
290 comments.push(index);
291 }
292
293 for (name, field) in options.table_fields.entries_mut() {
294 field["table_name"] = options.table_name.clone().into();
295 let row = br_fields::field("pgsql", name, field.clone());
296 let (col_sql, meta) = if let Some(idx) = row.find("--") {
297 (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
298 } else {
299 (row.trim(), None)
300 };
301 if let Some(meta) = meta {
302 comments.push(format!(
303 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
304 options.table_name, name, meta
305 ));
306 }
307 sql = format!("{} {},\r\n", sql, col_sql);
308 }
309
310 let primary_key = format!(
311 "CONSTRAINT {}_{} PRIMARY KEY ({})",
312 options.table_name, options.table_key, options.table_key
313 );
314 let sql = format!(
315 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
316 options.table_name,
317 sql.trim_end_matches(",\r\n"),
318 primary_key
319 );
320 comments.insert(0, sql);
321
322 for (_name, field) in options.table_fields.entries() {
323 let _ = field["mode"].as_str();
324 }
325
326 if self.params.sql {
327 let info = comments.join("\r\n");
328 return JsonValue::from(info);
329 }
330 for comment in comments {
331 let (state, _) = self.execute(comment.as_str());
332 match state {
333 true => {}
334 false => {
335 return JsonValue::from(state);
336 }
337 }
338 }
339 JsonValue::from(true)
340 }
341
342 fn table_update(&mut self, options: TableOptions) -> JsonValue {
343 let fields_list = self.table_info(&options.table_name);
344 let mut put = vec![];
345 let mut add = vec![];
346 let mut del = vec![];
347 let mut comments = vec![];
348
349 for (key, _) in fields_list.entries() {
350 if options.table_fields[key].is_empty() {
351 del.push(key);
352 }
353 }
354 for (name, field) in options.table_fields.entries() {
355 if !fields_list[name].is_empty() {
356 let old_info = &fields_list[name];
357 let new_field_sql = br_fields::field("pgsql", name, field.clone());
358
359 let old_comment = old_info["comment"].as_str().unwrap_or("");
360 let old_type = old_info["type"].as_str().unwrap_or("");
361
362 let new_comment = if let Some(idx) = new_field_sql.find("--") {
363 new_field_sql[idx + 2..].trim()
364 } else {
365 ""
366 };
367
368 let comment_matches =
369 if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
370 let old_parts: Vec<&str> = old_comment.split('|').collect();
371 let new_parts: Vec<&str> = new_comment.split('|').collect();
372 if old_parts.len() >= 4 && new_parts.len() >= 4 {
373 old_parts[..4] == new_parts[..4]
374 } else {
375 old_comment == new_comment
376 }
377 } else if !old_comment.is_empty() && !new_comment.is_empty() {
378 let old_parts: Vec<&str> = old_comment.split('|').collect();
379 let new_parts: Vec<&str> = new_comment.split('|').collect();
380 if old_parts.len() >= 2
381 && new_parts.len() >= 2
382 && old_parts.len() == new_parts.len()
383 {
384 let old_filtered: Vec<&str> = old_parts
385 .iter()
386 .filter(|v| **v != "true" && **v != "false")
387 .copied()
388 .collect();
389 let new_filtered: Vec<&str> = new_parts
390 .iter()
391 .filter(|v| **v != "true" && **v != "false")
392 .copied()
393 .collect();
394 old_filtered == new_filtered
395 } else {
396 old_comment == new_comment
397 }
398 } else {
399 old_comment == new_comment
400 };
401
402 let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
403 let new_type = if sql_parts.len() > 1 {
404 sql_parts[1].to_lowercase()
405 } else {
406 String::new()
407 };
408
409 let type_matches = match old_type {
410 "integer" => {
411 new_type.contains("int")
412 && !new_type.contains("bigint")
413 && !new_type.contains("smallint")
414 }
415 "bigint" => new_type.contains("bigint"),
416 "smallint" => new_type.contains("smallint"),
417 "boolean" => new_type.contains("boolean"),
418 "text" => new_type.contains("text"),
419 "character varying" => {
420 if !new_type.contains("varchar") {
421 false
422 } else {
423 let old_len = old_info["max_length"].as_i64().unwrap_or(0);
424 let new_len = new_type
425 .trim_start_matches("varchar(")
426 .trim_end_matches(')')
427 .parse::<i64>()
428 .unwrap_or(0);
429 let matched = old_len == new_len || new_len == 0;
430 if !matched {
431 log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
432 }
433 old_len == new_len || new_len == 0
434 }
435 }
436 "character" => new_type.contains("char") && !new_type.contains("varchar"),
437 "numeric" => {
438 if !(new_type.contains("numeric") || new_type.contains("decimal")) {
439 false
440 } else {
441 let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
442 let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
443 let inner = new_type
444 .replace("numeric(", "")
445 .replace("decimal(", "")
446 .replace(')', "");
447 let parts: Vec<&str> = inner.split(',').collect();
448 let new_prec = parts.first().and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
449 let new_scale = parts.get(1).and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
450 old_prec == new_prec && old_scale == new_scale
451 }
452 }
453 "double precision" => {
454 new_type.contains("double") || new_type.contains("float8")
455 }
456 "real" => new_type.contains("real") || new_type.contains("float4"),
457 "timestamp without time zone" | "timestamp with time zone" => {
458 new_type.contains("timestamp")
459 }
460 "date" => new_type.contains("date") && !new_type.contains("timestamp"),
461 "time without time zone" | "time with time zone" => {
462 new_type.contains("time") && !new_type.contains("timestamp")
463 }
464 "json" | "jsonb" => new_type.contains("json"),
465 "uuid" => new_type.contains("uuid"),
466 "bytea" => new_type.contains("bytea"),
467 _ => old_type == new_type,
468 };
469
470 if type_matches && comment_matches {
471 continue;
472 }
473
474 log::debug!(
475 "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
476 options.table_name,
477 name,
478 type_matches,
479 old_type,
480 new_type,
481 comment_matches
482 );
483 put.push(name);
484 } else {
485 add.push(name);
486 }
487 }
488
489 for name in add.iter() {
490 let name = name.to_string();
491 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
492 let rows = row.split("--").collect::<Vec<&str>>();
493 comments.push(format!(
494 r#"ALTER TABLE "{}" add {};"#,
495 options.table_name,
496 rows[0].trim()
497 ));
498 if rows.len() > 1 {
499 comments.push(format!(
500 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
501 options.table_name,
502 name,
503 rows[1].trim()
504 ));
505 }
506 }
507 for name in del.iter() {
508 comments.push(format!(
509 "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
510 options.table_name, name
511 ));
512 }
513 for name in put.iter() {
514 let name = name.to_string();
515 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
516 let rows = row.split("--").collect::<Vec<&str>>();
517
518 let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
519
520 if sql[1].contains("BOOLEAN") {
521 let text = format!(
522 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
523 options.table_name, name
524 );
525 comments.push(text.clone());
526 let text = format!(
527 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
528 options.table_name, name, sql[1]
529 );
530 comments.push(text.clone());
531 } else {
532 let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
533 let new_type_lower = sql[1].to_lowercase();
534 let is_date_to_numeric = (old_col_type == "date"
535 || old_col_type.contains("timestamp"))
536 && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
537 if is_date_to_numeric {
538 comments.push(format!(
539 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
540 options.table_name, name
541 ));
542 comments.push(format!(
543 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
544 options.table_name, name, sql[1], name, name, name
545 ));
546 } else {
547 let text = format!(
548 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
549 options.table_name, name, sql[1]
550 );
551 comments.push(text.clone());
552 }
553 };
554
555 if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
556 let default_value = rows[0][default_pos + 9..].trim();
557 if !default_value.is_empty() {
558 comments.push(format!(
559 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
560 options.table_name, name, default_value
561 ));
562 }
563 }
564 let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
567 .as_str()
568 .unwrap_or("YES");
569 let old_is_required = old_is_nullable == "NO";
570
571 if old_is_required && name != options.table_key {
573 comments.push(format!(
574 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
575 options.table_name, name
576 ));
577 }
578
579 if rows.len() > 1 {
580 comments.push(format!(
581 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
582 options.table_name,
583 name,
584 rows[1].trim()
585 ));
586 }
587 }
588
589 let mut unique_new = vec![];
590 let mut index_new = vec![];
591 let mut primary_key = vec![];
592 let (_, index_list) = self.query(
593 format!(
594 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
595 options.table_name
596 )
597 .as_str(),
598 );
599 for item in index_list.members() {
600 let key_name = item["indexname"].as_str().unwrap_or("");
601 let indexdef = item["indexdef"].to_string();
602
603 if indexdef.contains(
604 format!(
605 "CREATE UNIQUE INDEX {}_{} ON",
606 options.table_name, options.table_key
607 )
608 .as_str(),
609 ) {
610 primary_key.push(key_name.to_string());
611 continue;
612 }
613 if indexdef.contains("CREATE UNIQUE INDEX") {
614 unique_new.push(key_name.to_string());
615 continue;
616 }
617 if indexdef.contains("CREATE INDEX") {
618 index_new.push(key_name.to_string());
619 continue;
620 }
621 }
622
623 if !options.table_unique.is_empty() {
624 let full_name = format!(
625 "{}_unique_{}",
626 options.table_name,
627 options.table_unique.join("_")
628 );
629 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
630 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
631 let unique = format!(
632 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
633 name,
634 options.table_name,
635 options.table_unique.join(",")
636 );
637 if !unique_new.contains(&name) {
638 comments.push(unique);
639 }
640 unique_new.retain(|x| *x != name);
641 }
642
643 for row in options.table_index.iter() {
644 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
645 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
646 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
647 let index = format!(
648 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
649 name,
650 options.table_name,
651 row.join(",")
652 );
653 if !index_new.contains(&name) {
654 comments.push(index);
655 }
656 index_new.retain(|x| *x != name);
657 }
658
659 for item in unique_new {
660 if item.ends_with("_pkey") {
661 continue;
662 }
663 if item.starts_with("unique_") {
664 comments.push(format!(
665 "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
666 options.table_name,
667 item.clone()
668 ));
669 } else {
670 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
671 }
672 }
673 for item in index_new {
674 if item.ends_with("_pkey") {
675 continue;
676 }
677 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
678 }
679
680 if self.params.sql {
681 return JsonValue::from(comments.join(""));
682 }
683
684 if comments.is_empty() {
685 return JsonValue::from(-1);
686 }
687
688 for item in comments.iter() {
689 let (state, res) = self.execute(item.as_str());
690 match state {
691 true => {}
692 false => {
693 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
694 return JsonValue::from(0);
695 }
696 }
697 }
698 JsonValue::from(1)
699 }
700
701 fn table_info(&mut self, table: &str) -> JsonValue {
702 let sql = format!(
703 "SELECT COL.COLUMN_NAME,
704 COL.DATA_TYPE,
705 COL.IS_NULLABLE,
706 COL.CHARACTER_MAXIMUM_LENGTH,
707 COL.NUMERIC_PRECISION,
708 COL.NUMERIC_SCALE,
709 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
710 LEFT JOIN
711 pg_catalog.pg_description DESCRIPTION
712 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
713 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
714 let (state, data) = self.query(sql.as_str());
715 let mut list = object! {};
716 if state {
717 for item in data.members() {
718 let mut row = object! {};
719 row["field"] = item["column_name"].clone();
720 row["comment"] = item["comment"].clone();
721 row["type"] = item["data_type"].clone();
722 row["is_nullable"] = item["is_nullable"].clone();
723 row["max_length"] = item["character_maximum_length"].clone();
724 row["numeric_precision"] = item["numeric_precision"].clone();
725 row["numeric_scale"] = item["numeric_scale"].clone();
726 if let Some(field_name) = row["field"].as_str() {
727 list[field_name] = row.clone();
728 }
729 }
730 list
731 } else {
732 list
733 }
734 }
735
736 fn table_is_exist(&mut self, name: &str) -> bool {
737 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
738 let (state, data) = self.query(sql.as_str());
739 match state {
740 true => {
741 for item in data.members() {
742 if item.has_key("exists") {
743 return item["exists"].as_bool().unwrap_or(false);
744 }
745 }
746 false
747 }
748 false => false,
749 }
750 }
751
752 fn table(&mut self, name: &str) -> &mut Pgsql {
753 self.params = Params::default(self.connection.mode.str().as_str());
754 let table_name = format!("{}{}", self.connection.prefix, name);
755 if !super::sql_safety::validate_table_name(&table_name) {
756 error!("Invalid table name: {}", name);
757 }
758 self.params.table = table_name.clone();
759 self.params.join_table = table_name;
760 self
761 }
762
763 fn change_table(&mut self, name: &str) -> &mut Self {
764 self.params.join_table = name.to_string();
765 self
766 }
767
768 fn autoinc(&mut self) -> &mut Self {
769 self.params.autoinc = true;
770 self
771 }
772
773 fn timestamps(&mut self) -> &mut Self {
774 self.params.timestamps = true;
775 self
776 }
777
778 fn fetch_sql(&mut self) -> &mut Self {
779 self.params.sql = true;
780 self
781 }
782
783 fn order(&mut self, field: &str, by: bool) -> &mut Self {
784 self.params.order[field] = {
785 if by {
786 "DESC"
787 } else {
788 "ASC"
789 }
790 }
791 .into();
792 self
793 }
794
795 fn group(&mut self, field: &str) -> &mut Self {
796 let fields: Vec<&str> = field.split(",").collect();
797 for field in fields.iter() {
798 let field = field.to_string();
799 self.params.group[field.as_str()] = field.clone().into();
800 self.params.fields[field.as_str()] = field.clone().into();
801 }
802 self
803 }
804
805 fn distinct(&mut self) -> &mut Self {
806 self.params.distinct = true;
807 self
808 }
809
810 fn json(&mut self, field: &str) -> &mut Self {
811 let list: Vec<&str> = field.split(",").collect();
812 for item in list.iter() {
813 self.params.json[item.to_string().as_str()] = item.to_string().into();
814 }
815 self
816 }
817
818 fn location(&mut self, field: &str) -> &mut Self {
819 let list: Vec<&str> = field.split(",").collect();
820 for item in list.iter() {
821 self.params.location[item.to_string().as_str()] = item.to_string().into();
822 }
823 self
824 }
825
826 fn field(&mut self, field: &str) -> &mut Self {
827 let list: Vec<&str> = field.split(",").collect();
828 let join_table = if self.params.join_table.is_empty() {
829 self.params.table.clone()
830 } else {
831 self.params.join_table.clone()
832 };
833 for item in list.iter() {
834 let lower = item.to_lowercase();
835 let is_expr = lower.contains("count(")
836 || lower.contains("sum(")
837 || lower.contains("avg(")
838 || lower.contains("max(")
839 || lower.contains("min(")
840 || lower.contains("case ");
841 if is_expr {
842 self.params.fields[item.to_string().as_str()] = (*item).into();
843 } else if item.contains(" as ") {
844 let text = item.split(" as ").collect::<Vec<&str>>();
845 self.params.fields[item.to_string().as_str()] =
846 format!("{}.{} as {}", join_table, text[0], text[1]).into();
847 } else {
848 self.params.fields[item.to_string().as_str()] =
849 format!("{join_table}.{item}").into();
850 }
851 }
852 self
853 }
854
855 fn field_raw(&mut self, expr: &str) -> &mut Self {
856 self.params.fields[expr] = expr.into();
857 self
858 }
859
860 fn hidden(&mut self, name: &str) -> &mut Self {
861 let hidden: Vec<&str> = name.split(",").collect();
862
863 let fields_list = self.table_info(self.params.clone().table.as_str());
864 let mut data = array![];
865 for item in fields_list.members() {
866 let _ = data.push(object! {
867 "name":item["field"].as_str().unwrap_or("")
868 });
869 }
870
871 for item in data.members() {
872 let name = item["name"].as_str().unwrap_or("");
873 if !hidden.contains(&name) {
874 self.params.fields[name] = name.into();
875 }
876 }
877 self
878 }
879
880 fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
881 for f in field.split('|') {
882 if !super::sql_safety::validate_field_name(f) {
883 error!("Invalid field name: {}", f);
884 }
885 }
886 if !super::sql_safety::validate_compare_orator(compare) {
887 error!("Invalid compare operator: {}", compare);
888 }
889 let join_table = if self.params.join_table.is_empty() {
890 self.params.table.clone()
891 } else {
892 self.params.join_table.clone()
893 };
894 if value.is_boolean() {
895 let bool_val = value.as_bool().unwrap_or(false);
896 self.params
897 .where_and
898 .push(format!("{join_table}.{field} {compare} {bool_val}"));
899 return self;
900 }
901 match compare {
902 "between" => {
903 self.params.where_and.push(format!(
904 "{}.{} between '{}' AND '{}'",
905 join_table, field, value[0], value[1]
906 ));
907 }
908 "set" => {
909 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
910 let mut wheredata = vec![];
911 for item in list.iter() {
912 wheredata.push(format!(
913 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
914 ));
915 }
916 self.params
917 .where_and
918 .push(format!("({})", wheredata.join(" or ")));
919 }
920 "notin" => {
921 let mut text = String::new();
922 for item in value.members() {
923 text = format!("{text},'{item}'");
924 }
925 text = text.trim_start_matches(",").into();
926 self.params
927 .where_and
928 .push(format!("{join_table}.{field} not in ({text})"));
929 }
930 "is" => {
931 self.params
932 .where_and
933 .push(format!("{join_table}.{field} is {value}"));
934 }
935 "isnot" => {
936 self.params
937 .where_and
938 .push(format!("{join_table}.{field} is not {value}"));
939 }
940 "notlike" => {
941 self.params
942 .where_and
943 .push(format!("{join_table}.{field} not like '{value}'"));
944 }
945 "in" => {
946 if value.is_array() && value.is_empty() {
947 self.params.where_and.push("1=0".to_string());
948 return self;
949 }
950 let mut text = String::new();
951 if value.is_array() {
952 for item in value.members() {
953 text = format!("{text},'{item}'");
954 }
955 } else if value.is_null() {
956 text = format!("{text},null");
957 } else {
958 let value = value.as_str().unwrap_or("");
959
960 let value: Vec<&str> = value.split(",").collect();
961 for item in value.iter() {
962 text = format!("{text},'{item}'");
963 }
964 }
965 text = text.trim_start_matches(",").into();
966
967 self.params
968 .where_and
969 .push(format!("{join_table}.{field} {compare} ({text})"));
970 }
971 "json_contains" => {
975 if value.is_array() {
976 if value.is_empty() {
977 self.params.where_and.push("1=0".to_string());
978 } else {
979 let mut parts = vec![];
980 for item in value.members() {
981 let escaped = super::sql_safety::escape_string(&item.to_string());
982 parts.push(format!(
983 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
984 escaped
985 ));
986 }
987 self.params
988 .where_and
989 .push(format!("({})", parts.join(" OR ")));
990 }
991 } else {
992 let escaped = super::sql_safety::escape_string(&value.to_string());
993 self.params.where_and.push(format!(
994 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
995 escaped
996 ));
997 }
998 }
999 _ => {
1000 self.params
1001 .where_and
1002 .push(format!("{join_table}.{field} {compare} '{value}'"));
1003 }
1004 }
1005 self
1006 }
1007
1008 fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1009 for f in field.split('|') {
1010 if !super::sql_safety::validate_field_name(f) {
1011 error!("Invalid field name: {}", f);
1012 }
1013 }
1014 if !super::sql_safety::validate_compare_orator(compare) {
1015 error!("Invalid compare operator: {}", compare);
1016 }
1017 let join_table = if self.params.join_table.is_empty() {
1018 self.params.table.clone()
1019 } else {
1020 self.params.join_table.clone()
1021 };
1022
1023 if value.is_boolean() {
1024 let bool_val = value.as_bool().unwrap_or(false);
1025 self.params
1026 .where_or
1027 .push(format!("{join_table}.{field} {compare} {bool_val}"));
1028 return self;
1029 }
1030
1031 match compare {
1032 "between" => {
1033 self.params.where_or.push(format!(
1034 "{}.{} between '{}' AND '{}'",
1035 join_table, field, value[0], value[1]
1036 ));
1037 }
1038 "set" => {
1039 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1040 let mut wheredata = vec![];
1041 for item in list.iter() {
1042 wheredata.push(format!(
1043 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1044 ));
1045 }
1046 self.params
1047 .where_or
1048 .push(format!("({})", wheredata.join(" or ")));
1049 }
1050 "notin" => {
1051 let mut text = String::new();
1052 for item in value.members() {
1053 text = format!("{text},'{item}'");
1054 }
1055 text = text.trim_start_matches(",").into();
1056 self.params
1057 .where_or
1058 .push(format!("{join_table}.{field} not in ({text})"));
1059 }
1060 "is" => {
1061 self.params
1062 .where_or
1063 .push(format!("{join_table}.{field} is {value}"));
1064 }
1065 "isnot" => {
1066 self.params
1067 .where_or
1068 .push(format!("{join_table}.{field} is not {value}"));
1069 }
1070 "in" => {
1071 if value.is_array() && value.is_empty() {
1072 self.params.where_or.push("1=0".to_string());
1073 return self;
1074 }
1075 let mut text = String::new();
1076 if value.is_array() {
1077 for item in value.members() {
1078 text = format!("{text},'{item}'");
1079 }
1080 } else {
1081 let value = value.as_str().unwrap_or("");
1082 let value: Vec<&str> = value.split(",").collect();
1083 for item in value.iter() {
1084 text = format!("{text},'{item}'");
1085 }
1086 }
1087 text = text.trim_start_matches(",").into();
1088 self.params
1089 .where_or
1090 .push(format!("{join_table}.{field} {compare} ({text})"));
1091 }
1092 "json_contains" => {
1096 if value.is_array() {
1097 if value.is_empty() {
1098 self.params.where_or.push("1=0".to_string());
1099 } else {
1100 let mut parts = vec![];
1101 for item in value.members() {
1102 let escaped = super::sql_safety::escape_string(&item.to_string());
1103 parts.push(format!(
1104 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1105 escaped
1106 ));
1107 }
1108 self.params
1109 .where_or
1110 .push(format!("({})", parts.join(" OR ")));
1111 }
1112 } else {
1113 let escaped = super::sql_safety::escape_string(&value.to_string());
1114 self.params.where_or.push(format!(
1115 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1116 escaped
1117 ));
1118 }
1119 }
1120 _ => {
1121 self.params
1122 .where_or
1123 .push(format!("{join_table}.{field} {compare} '{value}'"));
1124 }
1125 }
1126 self
1127 }
1128
1129 fn where_raw(&mut self, expr: &str) -> &mut Self {
1130 self.params.where_and.push(expr.to_string());
1131 self
1132 }
1133
1134 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1135 self.params
1136 .where_and
1137 .push(format!("\"{field}\" IN ({sub_sql})"));
1138 self
1139 }
1140
1141 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1142 self.params
1143 .where_and
1144 .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1145 self
1146 }
1147
1148 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1149 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1150 self
1151 }
1152
1153 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1154 self.params
1155 .where_and
1156 .push(format!("NOT EXISTS ({sub_sql})"));
1157 self
1158 }
1159
1160 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1161 self.params.where_column = format!(
1162 "{}.{} {} {}.{}",
1163 self.params.table, field_a, compare, self.params.table, field_b
1164 );
1165 self
1166 }
1167
1168 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1169 self.params
1170 .update_column
1171 .push(format!("{field_a} = {compare}"));
1172 self
1173 }
1174
1175 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1176 self.params.page = page;
1177 self.params.limit = limit;
1178 self
1179 }
1180
1181 fn limit(&mut self, count: i32) -> &mut Self {
1182 self.params.limit_only = count;
1183 self
1184 }
1185
1186 fn column(&mut self, field: &str) -> JsonValue {
1187 self.field(field);
1188 let sql = self.params.select_sql();
1189
1190 if self.params.sql {
1191 return JsonValue::from(sql);
1192 }
1193 let (state, data) = self.query(sql.as_str());
1194 match state {
1195 true => {
1196 let mut list = array![];
1197 for item in data.members() {
1198 if self.params.json[field].is_empty() {
1199 let _ = list.push(item[field].clone());
1200 } else {
1201 let data =
1202 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1203 let _ = list.push(data);
1204 }
1205 }
1206 list
1207 }
1208 false => {
1209 array![]
1210 }
1211 }
1212 }
1213
1214 fn count(&mut self) -> JsonValue {
1215 self.params.fields = json::object! {};
1216 self.params.fields["count"] = "count(*) as count".to_string().into();
1217 let sql = self.params.select_sql();
1218 if self.params.sql {
1219 return JsonValue::from(sql.clone());
1220 }
1221 let (state, data) = self.query(sql.as_str());
1222 if state {
1223 data[0]["count"].clone()
1224 } else {
1225 JsonValue::from(0)
1226 }
1227 }
1228
1229 fn max(&mut self, field: &str) -> JsonValue {
1230 self.params.fields[field] = format!("max({field}) as {field}").into();
1231 let sql = self.params.select_sql();
1232 if self.params.sql {
1233 return JsonValue::from(sql.clone());
1234 }
1235 let (state, data) = self.query(sql.as_str());
1236 if state {
1237 if data.len() > 1 {
1238 return data.clone();
1239 }
1240 data[0][field].clone()
1241 } else {
1242 JsonValue::from(0)
1243 }
1244 }
1245
1246 fn min(&mut self, field: &str) -> JsonValue {
1247 self.params.fields[field] = format!("min({field}) as {field}").into();
1248 let sql = self.params.select_sql();
1249 if self.params.sql {
1250 return JsonValue::from(sql.clone());
1251 }
1252 let (state, data) = self.query(sql.as_str());
1253 if state {
1254 if data.len() > 1 {
1255 return data;
1256 }
1257 data[0][field].clone()
1258 } else {
1259 JsonValue::from(0)
1260 }
1261 }
1262
1263 fn sum(&mut self, field: &str) -> JsonValue {
1264 self.params.fields[field] = format!("sum({field}) as {field}").into();
1265 let sql = self.params.select_sql();
1266 if self.params.sql {
1267 return JsonValue::from(sql.clone());
1268 }
1269 let (state, data) = self.query(sql.as_str());
1270 match state {
1271 true => {
1272 if data.len() > 1 {
1273 return data;
1274 }
1275 data[0][field].clone()
1276 }
1277 false => JsonValue::from(0),
1278 }
1279 }
1280
1281 fn avg(&mut self, field: &str) -> JsonValue {
1282 self.params.fields[field] = format!("avg({field}) as {field}").into();
1283 let sql = self.params.select_sql();
1284 if self.params.sql {
1285 return JsonValue::from(sql.clone());
1286 }
1287 let (state, data) = self.query(sql.as_str());
1288 if state {
1289 if data.len() > 1 {
1290 return data;
1291 }
1292 data[0][field].clone()
1293 } else {
1294 JsonValue::from(0)
1295 }
1296 }
1297
1298 fn having(&mut self, expr: &str) -> &mut Self {
1299 self.params.having.push(expr.to_string());
1300 self
1301 }
1302
1303 fn select(&mut self) -> JsonValue {
1304 let sql = self.params.select_sql();
1305 if self.params.sql {
1306 return JsonValue::from(sql.clone());
1307 }
1308 let (state, mut data) = self.query(sql.as_str());
1309 match state {
1310 true => {
1311 for (field, _) in self.params.json.entries() {
1312 for item in data.members_mut() {
1313 if !item[field].is_empty() {
1314 let json = item[field].to_string();
1315 item[field] = match json::parse(&json) {
1316 Ok(e) => e,
1317 Err(_) => JsonValue::from(json),
1318 };
1319 }
1320 }
1321 }
1322 data.clone()
1323 }
1324 false => array![],
1325 }
1326 }
1327
1328 fn find(&mut self) -> JsonValue {
1329 self.params.page = 1;
1330 self.params.limit = 1;
1331 let sql = self.params.select_sql();
1332 if self.params.sql {
1333 return JsonValue::from(sql.clone());
1334 }
1335 let (state, mut data) = self.query(sql.as_str());
1336 match state {
1337 true => {
1338 if data.is_empty() {
1339 return object! {};
1340 }
1341 for (field, _) in self.params.json.entries() {
1342 if !data[0][field].is_empty() {
1343 let json = data[0][field].to_string();
1344 let json = json::parse(&json).unwrap_or(array![]);
1345 data[0][field] = json;
1346 } else {
1347 data[0][field] = array![];
1348 }
1349 }
1350 data[0].clone()
1351 }
1352 false => {
1353 object! {}
1354 }
1355 }
1356 }
1357
1358 fn value(&mut self, field: &str) -> JsonValue {
1359 self.params.fields = object! {};
1360 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1361 self.params.page = 1;
1362 self.params.limit = 1;
1363 let sql = self.params.select_sql();
1364 if self.params.sql {
1365 return JsonValue::from(sql.clone());
1366 }
1367 let (state, mut data) = self.query(sql.as_str());
1368 match state {
1369 true => {
1370 for (field, _) in self.params.json.entries() {
1371 if !data[0][field].is_empty() {
1372 let json = data[0][field].to_string();
1373 let json = json::parse(&json).unwrap_or(array![]);
1374 data[0][field] = json;
1375 } else {
1376 data[0][field] = array![];
1377 }
1378 }
1379 data[0][field].clone()
1380 }
1381 false => {
1382 if self.connection.debug {
1383 info!("{data:?}");
1384 }
1385 JsonValue::Null
1386 }
1387 }
1388 }
1389
1390 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1391 let fields_list = self.table_info(&self.params.table.clone());
1392 let mut fields = vec![];
1393 let mut values = vec![];
1394 if !self.params.autoinc && data["id"].is_empty() {
1395 let thread_id = format!("{:?}", std::thread::current().id());
1396 let thread_num: u64 = thread_id
1397 .trim_start_matches("ThreadId(")
1398 .trim_end_matches(")")
1399 .parse()
1400 .unwrap_or(0);
1401 data["id"] = format!(
1402 "{:X}{:X}",
1403 Local::now().timestamp_nanos_opt().unwrap_or(0),
1404 thread_num
1405 )
1406 .into();
1407 }
1408 for (field, value) in data.entries() {
1409 fields.push(format!("\"{}\"", field));
1410
1411 if value.is_string() {
1412 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1413 continue;
1414 } else if value.is_array() {
1415 if self.params.json[field].is_empty() {
1416 let array = value
1417 .members()
1418 .map(|x| x.as_str().unwrap_or(""))
1419 .collect::<Vec<&str>>()
1420 .join(",");
1421 values.push(format!("'{array}'"));
1422 } else {
1423 let json = value.to_string();
1424 let json = json.replace("'", "''");
1425 values.push(format!("'{json}'"));
1426 }
1427 continue;
1428 } else if value.is_object() {
1429 if self.params.json[field].is_empty() {
1430 values.push(format!("'{value}'"));
1431 } else {
1432 let json = value.to_string();
1433 let json = json.replace("'", "''");
1434 values.push(format!("'{json}'"));
1435 }
1436 continue;
1437 } else if value.is_number() {
1438 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1439 if col_type == "boolean" {
1440 let bool_val = value.as_i64().unwrap_or(0) != 0;
1441 values.push(format!("{bool_val}"));
1442 } else if col_type.contains("int") {
1443 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1444 } else {
1445 values.push(format!("{value}"));
1446 }
1447 continue;
1448 } else if value.is_boolean() || value.is_null() {
1449 values.push(format!("{value}"));
1450 continue;
1451 } else {
1452 values.push(format!("'{value}'"));
1453 continue;
1454 }
1455 }
1456 let fields = fields.join(",");
1457 let values = values.join(",");
1458
1459 let sql = format!(
1460 "INSERT INTO {} ({}) VALUES ({});",
1461 self.params.table, fields, values
1462 );
1463 if self.params.sql {
1464 return JsonValue::from(sql.clone());
1465 }
1466 let (state, ids) = self.execute(sql.as_str());
1467
1468 match state {
1469 true => match self.params.autoinc {
1470 true => ids.clone(),
1471 false => data["id"].clone(),
1472 },
1473 false => {
1474 let thread_id = format!("{:?}", thread::current().id());
1475 error!("添加失败: {thread_id} {ids:?} {sql}");
1476 JsonValue::from("")
1477 }
1478 }
1479 }
1480 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1481 let fields_list = self.table_info(&self.params.table.clone());
1482 let mut fields = String::new();
1483 if !self.params.autoinc && data[0]["id"].is_empty() {
1484 data[0]["id"] = "".into();
1485 }
1486 for (field, _) in data[0].entries() {
1487 fields = format!("{fields},\"{field}\"");
1488 }
1489 fields = fields.trim_start_matches(",").to_string();
1490
1491 let core_count = num_cpus::get();
1492 let mut p = pools::Pool::new(core_count * 4);
1493
1494 let autoinc = self.params.autoinc;
1495 for list in data.members() {
1496 let mut item = list.clone();
1497 let i = br_fields::str::Code::verification_code(3);
1498 let fields_list_new = fields_list.clone();
1499 p.execute(move |pcindex| {
1500 if !autoinc && item["id"].is_empty() {
1501 let id = format!(
1502 "{:X}{:X}{}",
1503 Local::now().timestamp_nanos_opt().unwrap_or(0),
1504 pcindex,
1505 i
1506 );
1507 item["id"] = id.into();
1508 }
1509 let mut values = "".to_string();
1510 for (field, value) in item.entries() {
1511 if value.is_string() {
1512 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1513 } else if value.is_number() {
1514 let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1515 if col_type == "boolean" {
1516 let bool_val = value.as_i64().unwrap_or(0) != 0;
1517 values = format!("{values},{bool_val}");
1518 } else if col_type.contains("int") {
1519 values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1520 } else {
1521 values = format!("{values},{value}");
1522 }
1523 } else if value.is_boolean() {
1524 values = format!("{values},{value}");
1525 continue;
1526 } else {
1527 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1528 }
1529 }
1530 values = format!("({})", values.trim_start_matches(","));
1531 array![item["id"].clone(), values]
1532 });
1533 }
1534 let (ids_list, mut values) = p.insert_all();
1535 values = values.trim_start_matches(",").to_string();
1536 let sql = format!(
1537 "INSERT INTO {} ({}) VALUES {};",
1538 self.params.table, fields, values
1539 );
1540
1541 if self.params.sql {
1542 return JsonValue::from(sql.clone());
1543 }
1544 let (state, data) = self.execute(sql.as_str());
1545 match state {
1546 true => match autoinc {
1547 true => data,
1548 false => JsonValue::from(ids_list),
1549 },
1550 false => {
1551 error!("insert_all: {data:?}");
1552 let _ = std::fs::write("pgsql_insert_all_error.log", format!("insert_all error:\n{:?}\n\nSQL:\n{}", data, sql));
1553 array![]
1554 }
1555 }
1556 }
1557 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1558 let fields_list = self.table_info(&self.params.table.clone());
1559 let mut fields = vec![];
1560 let mut values = vec![];
1561 if !self.params.autoinc && data["id"].is_empty() {
1562 let thread_id = format!("{:?}", std::thread::current().id());
1563 let thread_num: u64 = thread_id
1564 .trim_start_matches("ThreadId(")
1565 .trim_end_matches(")")
1566 .parse()
1567 .unwrap_or(0);
1568 data["id"] = format!(
1569 "{:X}{:X}",
1570 Local::now().timestamp_nanos_opt().unwrap_or(0),
1571 thread_num
1572 )
1573 .into();
1574 }
1575 for (field, value) in data.entries() {
1576 fields.push(format!("\"{}\"", field));
1577
1578 if value.is_string() {
1579 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1580 continue;
1581 } else if value.is_array() {
1582 if self.params.json[field].is_empty() {
1583 let array = value
1584 .members()
1585 .map(|x| x.as_str().unwrap_or(""))
1586 .collect::<Vec<&str>>()
1587 .join(",");
1588 values.push(format!("'{array}'"));
1589 } else {
1590 let json = value.to_string();
1591 let json = json.replace("'", "''");
1592 values.push(format!("'{json}'"));
1593 }
1594 continue;
1595 } else if value.is_object() {
1596 if self.params.json[field].is_empty() {
1597 values.push(format!("'{value}'"));
1598 } else {
1599 let json = value.to_string();
1600 let json = json.replace("'", "''");
1601 values.push(format!("'{json}'"));
1602 }
1603 continue;
1604 } else if value.is_number() {
1605 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1606 if col_type == "boolean" {
1607 let bool_val = value.as_i64().unwrap_or(0) != 0;
1608 values.push(format!("{bool_val}"));
1609 } else if col_type.contains("int") {
1610 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1611 } else {
1612 values.push(format!("{value}"));
1613 }
1614 continue;
1615 } else if value.is_boolean() || value.is_null() {
1616 values.push(format!("{value}"));
1617 continue;
1618 } else {
1619 values.push(format!("'{value}'"));
1620 continue;
1621 }
1622 }
1623
1624 let conflict_cols: Vec<String> = conflict_fields
1625 .iter()
1626 .map(|f| format!("\"{}\"", f))
1627 .collect();
1628
1629 let update_set: Vec<String> = fields
1630 .iter()
1631 .filter(|f| {
1632 let name = f.trim_matches('"');
1633 !conflict_fields.contains(&name) && name != "id"
1634 })
1635 .map(|f| format!("{f}=EXCLUDED.{f}"))
1636 .collect();
1637
1638 let fields_str = fields.join(",");
1639 let values_str = values.join(",");
1640
1641 let sql = format!(
1642 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1643 self.params.table,
1644 fields_str,
1645 values_str,
1646 conflict_cols.join(","),
1647 update_set.join(",")
1648 );
1649 if self.params.sql {
1650 return JsonValue::from(sql.clone());
1651 }
1652 let (state, result) = self.execute(sql.as_str());
1653 match state {
1654 true => match self.params.autoinc {
1655 true => result.clone(),
1656 false => data["id"].clone(),
1657 },
1658 false => {
1659 let thread_id = format!("{:?}", thread::current().id());
1660 error!("upsert失败: {thread_id} {result:?} {sql}");
1661 JsonValue::from("")
1662 }
1663 }
1664 }
1665 fn update(&mut self, data: JsonValue) -> JsonValue {
1666 let fields_list = self.table_info(&self.params.table.clone());
1667 let mut values = vec![];
1668 for (field, value) in data.entries() {
1669 if value.is_string() {
1670 values.push(format!(
1671 "\"{}\"='{}'",
1672 field,
1673 value.to_string().replace("'", "''")
1674 ));
1675 } else if value.is_number() {
1676 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1677 if col_type == "boolean" {
1678 let bool_val = value.as_i64().unwrap_or(0) != 0;
1679 values.push(format!("\"{field}\"= {bool_val}"));
1680 } else if col_type.contains("int") {
1681 values.push(format!(
1682 "\"{}\"= {}",
1683 field,
1684 value.as_f64().unwrap_or(0.0) as i64
1685 ));
1686 } else {
1687 values.push(format!("\"{field}\"= {value}"));
1688 }
1689 } else if value.is_array() {
1690 if self.params.json[field].is_empty() {
1691 let array = value
1692 .members()
1693 .map(|x| x.as_str().unwrap_or(""))
1694 .collect::<Vec<&str>>()
1695 .join(",");
1696 values.push(format!("\"{field}\"='{array}'"));
1697 } else {
1698 let json = value.to_string();
1699 let json = json.replace("'", "''");
1700 values.push(format!("\"{field}\"='{json}'"));
1701 }
1702 continue;
1703 } else if value.is_object() {
1704 if self.params.json[field].is_empty() {
1705 values.push(format!("\"{field}\"='{value}'"));
1706 } else {
1707 if value.is_empty() {
1708 values.push(format!("\"{field}\"=''"));
1709 continue;
1710 }
1711 let json = value.to_string();
1712 let json = json.replace("'", "''");
1713 values.push(format!("\"{field}\"='{json}'"));
1714 }
1715 continue;
1716 } else if value.is_boolean() {
1717 values.push(format!("\"{field}\"= {value}"));
1718 } else {
1719 values.push(format!("\"{field}\"=\"{value}\""));
1720 }
1721 }
1722
1723 for (field, value) in self.params.inc_dec.entries() {
1724 values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1725 }
1726 if !self.params.update_column.is_empty() {
1727 values.extend(self.params.update_column.clone());
1728 }
1729 let values = values.join(",");
1730
1731 let sql = format!(
1732 "UPDATE {} SET {} {};",
1733 self.params.table.clone(),
1734 values,
1735 self.params.where_sql()
1736 );
1737 if self.params.sql {
1738 return JsonValue::from(sql.clone());
1739 }
1740 let (state, data) = self.execute(sql.as_str());
1741 if state {
1742 data
1743 } else {
1744 let thread_id = format!("{:?}", thread::current().id());
1745 error!("update: {thread_id} {data:?} {sql}");
1746 0.into()
1747 }
1748 }
1749 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1750 let fields_list = self.table_info(&self.params.table.clone());
1751 let mut values = vec![];
1752
1753 let mut ids = vec![];
1754 for (field, _) in data[0].entries() {
1755 if field == "id" {
1756 continue;
1757 }
1758 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1759 let mut fields = vec![];
1760 for row in data.members() {
1761 let value = row[field].clone();
1762 let id = row["id"].clone();
1763 ids.push(id.clone());
1764 if value.is_string() {
1765 fields.push(format!(
1766 "WHEN '{}' THEN '{}'",
1767 id,
1768 value.to_string().replace("'", "''")
1769 ));
1770 } else if value.is_array() || value.is_object() {
1771 if self.params.json[field].is_empty() {
1772 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1773 } else {
1774 let json = value.to_string();
1775 let json = json.replace("'", "''");
1776 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1777 }
1778 continue;
1779 } else if value.is_number() {
1780 if col_type == "boolean" {
1781 let bool_val = value.as_i64().unwrap_or(0) != 0;
1782 fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1783 } else {
1784 fields.push(format!("WHEN '{id}' THEN {value}"));
1785 }
1786 } else if value.is_boolean() || value.is_null() {
1787 fields.push(format!("WHEN '{id}' THEN {value}"));
1788 } else {
1789 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1790 }
1791 }
1792 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1793 }
1794 self.where_and("id", "in", ids.into());
1795 for (field, value) in self.params.inc_dec.entries() {
1796 values.push(format!("{} = {}", field, value.to_string().clone()));
1797 }
1798
1799 let values = values.join(",");
1800 let sql = format!(
1801 "UPDATE {} SET {} {} {};",
1802 self.params.table.clone(),
1803 values,
1804 self.params.where_sql(),
1805 self.params.page_limit_sql()
1806 );
1807 if self.params.sql {
1808 return JsonValue::from(sql.clone());
1809 }
1810 let (state, data) = self.execute(sql.as_str());
1811 if state {
1812 data
1813 } else {
1814 error!("update_all: {data:?}");
1815 JsonValue::from(0)
1816 }
1817 }
1818
1819 fn delete(&mut self) -> JsonValue {
1820 let sql = format!(
1821 "delete FROM {} {} {};",
1822 self.params.table.clone(),
1823 self.params.where_sql(),
1824 self.params.page_limit_sql()
1825 );
1826 if self.params.sql {
1827 return JsonValue::from(sql.clone());
1828 }
1829 let (state, data) = self.execute(sql.as_str());
1830 match state {
1831 true => data,
1832 false => {
1833 error!("delete 失败>>> {data:?}");
1834 JsonValue::from(0)
1835 }
1836 }
1837 }
1838
1839 fn transaction(&mut self) -> bool {
1840 let thread_id = format!("{:?}", thread::current().id());
1841 let key = format!("{}{}", self.default, thread_id);
1842
1843 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1844 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1845 PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1846 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1847 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1848 return true;
1849 }
1850
1851 let mut conn = None;
1853 for attempt in 0..3u8 {
1854 match self.client.get_connect_for_transaction() {
1855 Ok(mut c) => {
1856 if c.is_valid() {
1857 conn = Some(c);
1858 break;
1859 }
1860 warn!("事务连接无效(第{}次)", attempt + 1);
1861 self.client.release_transaction_conn();
1862 }
1863 Err(e) => {
1864 warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1865 }
1866 }
1867 if attempt < 2 {
1868 thread::sleep(std::time::Duration::from_millis(200));
1869 }
1870 }
1871 let mut conn = match conn {
1872 Some(c) => c,
1873 None => {
1874 error!("获取事务连接重试耗尽");
1875 return false;
1876 }
1877 };
1878
1879 if let Err(e) = conn.execute("START TRANSACTION") {
1880 error!("启动事务失败: {e}");
1881 self.client.release_transaction_conn();
1882 return false;
1883 }
1884
1885
1886 PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1887 true
1888 }
1889 fn commit(&mut self) -> bool {
1890 let thread_id = format!("{:?}", thread::current().id());
1891 let key = format!("{}{}", self.default, thread_id);
1892
1893 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1894 error!("commit: 没有活跃的事务");
1895 return false;
1896 }
1897
1898 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1899 if depth > 1 {
1900 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1901 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1902 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1903 return true;
1904 }
1905
1906 let commit_result =
1907 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1908
1909 let success = match commit_result {
1910 Some(Ok(_)) => true,
1911 Some(Err(e)) => {
1912 error!("提交事务失败: {e}");
1913 false
1914 }
1915 None => {
1916 error!("提交事务失败: 未找到连接");
1917 false
1918 }
1919 };
1920
1921 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1922 self.client.release_transaction_conn();
1923 success
1924 }
1925
1926 fn rollback(&mut self) -> bool {
1927 let thread_id = format!("{:?}", thread::current().id());
1928 let key = format!("{}{}", self.default, thread_id);
1929
1930 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1931 error!("rollback: 没有活跃的事务");
1932 return false;
1933 }
1934
1935 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1936 if depth > 1 {
1937 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1938 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1939 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1940 return true;
1941 }
1942
1943 let rollback_result =
1944 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1945
1946 let success = match rollback_result {
1947 Some(Ok(_)) => true,
1948 Some(Err(e)) => {
1949 error!("回滚失败: {e}");
1950 false
1951 }
1952 None => {
1953 error!("回滚失败: 未找到连接");
1954 false
1955 }
1956 };
1957
1958 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1959 self.client.release_transaction_conn();
1960 success
1961 }
1962
1963 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1964 let (state, data) = self.query(sql);
1965 match state {
1966 true => Ok(data),
1967 false => Err(data.to_string()),
1968 }
1969 }
1970
1971 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1972 let (state, data) = self.execute(sql);
1973 match state {
1974 true => Ok(data),
1975 false => Err(data.to_string()),
1976 }
1977 }
1978
1979 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1980 self.params.inc_dec[field] = format!("{field} + {num}").into();
1981 self
1982 }
1983
1984 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1985 self.params.inc_dec[field] = format!("{field} - {num}").into();
1986 self
1987 }
1988 fn buildsql(&mut self) -> String {
1989 self.fetch_sql();
1990 let sql = self.select().to_string();
1991 format!("( {} ) {}", sql, self.params.table)
1992 }
1993
1994 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1995 for field in fields {
1996 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1997 }
1998 self
1999 }
2000
2001 fn join(
2002 &mut self,
2003 main_table: &str,
2004 main_fields: &str,
2005 right_table: &str,
2006 right_fields: &str,
2007 ) -> &mut Self {
2008 let main_table = if main_table.is_empty() {
2009 self.params.table.clone()
2010 } else {
2011 main_table.to_string()
2012 };
2013 self.params.join_table = right_table.to_string();
2014 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2015 self
2016 }
2017
2018 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2019 let main_fields = if main_fields.is_empty() {
2020 "id"
2021 } else {
2022 main_fields
2023 };
2024 let second_fields = if second_fields.is_empty() {
2025 self.params.table.clone()
2026 } else {
2027 second_fields.to_string().clone()
2028 };
2029 let sec_table_name = format!("{}{}", table, "_2");
2030 let second_table = format!("{} {}", table, sec_table_name.clone());
2031 self.params.join_table = sec_table_name.clone();
2032 self.params.join.push(format!(
2033 " INNER JOIN {} ON {}.{} = {}.{}",
2034 second_table, self.params.table, main_fields, sec_table_name, second_fields
2035 ));
2036 self
2037 }
2038
2039 fn join_right(
2040 &mut self,
2041 main_table: &str,
2042 main_fields: &str,
2043 right_table: &str,
2044 right_fields: &str,
2045 ) -> &mut Self {
2046 let main_table = if main_table.is_empty() {
2047 self.params.table.clone()
2048 } else {
2049 main_table.to_string()
2050 };
2051 self.params.join_table = right_table.to_string();
2052 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2053 self
2054 }
2055
2056 fn join_full(
2057 &mut self,
2058 main_table: &str,
2059 main_fields: &str,
2060 right_table: &str,
2061 right_fields: &str,
2062 ) -> &mut Self {
2063 let main_table = if main_table.is_empty() {
2064 self.params.table.clone()
2065 } else {
2066 main_table.to_string()
2067 };
2068 self.params.join_table = right_table.to_string();
2069 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2070 self
2071 }
2072
2073 fn union(&mut self, sub_sql: &str) -> &mut Self {
2074 self.params.unions.push(format!("UNION {sub_sql}"));
2075 self
2076 }
2077
2078 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2079 self.params.unions.push(format!("UNION ALL {sub_sql}"));
2080 self
2081 }
2082
2083 fn lock_for_update(&mut self) -> &mut Self {
2084 self.params.lock_mode = "FOR UPDATE".to_string();
2085 self
2086 }
2087
2088 fn lock_for_share(&mut self) -> &mut Self {
2089 self.params.lock_mode = "FOR SHARE".to_string();
2090 self
2091 }
2092}