1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use crate::TABLE_FIELDS;
6use br_pgsql::pools::Pools;
7use br_pgsql::PgsqlError;
8use chrono::Local;
9use json::{array, object, JsonValue};
10use log::{error, info, warn};
11use std::thread;
12#[derive(Clone)]
13pub struct Pgsql {
14 pub connection: Connection,
16 pub default: String,
18 pub params: Params,
19 pub client: Pools,
20}
21
22impl Pgsql {
23 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
24 let port = connection
25 .hostport
26 .parse::<i32>()
27 .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
28
29 let cp_connection = connection.clone();
30 let config = object! {
31 debug: cp_connection.debug,
32 username: cp_connection.username,
33 userpass: cp_connection.userpass,
34 database: cp_connection.database,
35 hostname: cp_connection.hostname,
36 hostport: port,
37 charset: cp_connection.charset.str(),
38 pool_max: cp_connection.pool.max_connections,
39 };
40 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
41
42 let pools = pgsql.pools()?;
43 Ok(Self {
44 connection,
45 default: default.clone(),
46 params: Params::default("pgsql"),
47 client: pools,
48 })
49 }
50
51 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
52 let thread_id = format!("{:?}", thread::current().id());
53 let key = format!("{}{}", self.default, thread_id);
54
55 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
56 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
57
58 match result {
59 Some(Ok(e)) => {
60 if self.connection.debug {
61 info!("查询成功: {} {}", thread_id.clone(), sql);
62 }
63 (true, e.rows)
64 }
65 Some(Err(e)) => {
66 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
67 (false, JsonValue::from(e.to_string()))
68 }
69 None => {
70 error!("事务查询失败: 未找到事务连接 {thread_id}");
71 (false, JsonValue::from("未找到事务连接"))
72 }
73 }
74 } else {
75 let mut guard = match self.client.get_guard() {
76 Ok(g) => g,
77 Err(e) => {
78 error!(
80 "非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
81 );
82 return (false, JsonValue::from(e.to_string()));
83 }
84 };
85 match guard.conn().query(sql) {
86 Ok(e) => {
87 if self.connection.debug {
88 info!("查询成功: {} {}", thread_id.clone(), sql);
89 }
90 (true, e.rows)
91 }
92 Err(ref e) if Self::is_retriable_error(e) => {
93 guard.discard();
96 self.client.flush_idle();
98 warn!("非事务查询连接断开(重试一次): {thread_id} {e}");
99 thread::sleep(std::time::Duration::from_millis(200));
100 let mut guard2 = match self.client.get_guard() {
101 Ok(g) => g,
102 Err(e) => {
103 error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
104 return (false, JsonValue::from(e.to_string()));
105 }
106 };
107 match guard2.conn().query(sql) {
108 Ok(e) => {
109 if self.connection.debug {
110 info!("查询成功(重试): {} {}", thread_id.clone(), sql);
111 }
112 (true, e.rows)
113 }
114 Err(e) => {
115 error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
116 (false, JsonValue::from(e.to_string()))
117 }
118 }
119 }
120 Err(e) => {
121 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
122 (false, JsonValue::from(e.to_string()))
123 }
124 }
125 }
126 }
127 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
128 let thread_id = format!("{:?}", thread::current().id());
129 let key = format!("{}{}", self.default, thread_id);
130
131 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
132 let lock_timeout = self.dynamic_table_lock_timeout(&key);
133 if self.params.table.is_empty() {
134 warn!("事务写操作未设置表名,跳过应用层表锁: {thread_id}");
135 } else if !PGSQL_TRANSACTION_MANAGER.acquire_table_lock(
136 &self.params.table,
137 &key,
138 lock_timeout,
139 ) {
140 error!("获取表锁超时: {} {}", self.params.table, thread_id);
141 return (false, JsonValue::from("table lock timeout"));
142 }
143 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
144
145 match result {
146 Some(Ok(e)) => {
147 if self.connection.debug {
148 info!("提交成功: {} {}", thread_id.clone(), sql);
149 }
150 if sql.contains("INSERT") {
151 (true, e.rows)
152 } else {
153 (true, e.affect_count.into())
154 }
155 }
156 Some(Err(e)) => {
157 error!("事务提交失败: {thread_id} {e}");
158 (false, JsonValue::from(e.to_string()))
159 }
160 None => {
161 error!("事务执行失败: 未找到事务连接 {thread_id}");
162 (false, JsonValue::from("未找到事务连接"))
163 }
164 }
165 } else {
166 let mut guard = match self.client.get_guard() {
168 Ok(g) => g,
169 Err(e) => {
170 error!(
171 "非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
172 );
173 return (false, JsonValue::from(e.to_string()));
174 }
175 };
176 match guard.conn().execute(sql) {
177 Ok(e) => {
178 if self.connection.debug {
179 info!("提交成功: {} {}", thread_id.clone(), sql);
180 }
181 if sql.contains("INSERT") {
182 (true, e.rows)
183 } else {
184 (true, e.affect_count.into())
185 }
186 }
187 Err(ref e) if Self::is_retriable_error(e) => {
188 guard.discard();
191 self.client.flush_idle();
193 warn!("非事务执行连接断开(重试一次): {thread_id} {e}");
194 thread::sleep(std::time::Duration::from_millis(200));
195 let mut guard2 = match self.client.get_guard() {
196 Ok(g) => g,
197 Err(e) => {
198 error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
199 return (false, JsonValue::from(e.to_string()));
200 }
201 };
202 match guard2.conn().execute(sql) {
203 Ok(e) => {
204 if self.connection.debug {
205 info!("提交成功(重试): {} {}", thread_id.clone(), sql);
206 }
207 if sql.contains("INSERT") {
208 (true, e.rows)
209 } else {
210 (true, e.affect_count.into())
211 }
212 }
213 Err(e) => {
214 error!("非事务执行重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
215 (false, JsonValue::from(e.to_string()))
216 }
217 }
218 }
219 Err(e) => {
220 error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
221 (false, JsonValue::from(e.to_string()))
222 }
223 }
224 }
225 }
226
227 fn is_retriable_error(e: &PgsqlError) -> bool {
229 matches!(
230 e,
231 PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
232 )
233 }
234
235 fn dynamic_table_lock_timeout(&self, key: &str) -> std::time::Duration {
236 let pool = &self.connection.pool;
237 let min_secs = pool.connect_timeout_secs.max(1);
238 let base_secs = pool.write_timeout_secs.max(min_secs);
239 let max_secs = pool
240 .read_timeout_secs
241 .saturating_add(pool.write_timeout_secs)
242 .max(base_secs.saturating_add(10))
243 .max(min_secs);
244
245 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(key).max(1) as u64;
246 let (conn_count, lock_count) = PGSQL_TRANSACTION_MANAGER.stats();
247 let pressure = (conn_count + lock_count) as u64;
248 let pressure_bonus = (pressure / 2).min(10);
249 let depth_bonus = depth.saturating_sub(1).min(5);
250
251 let timeout_secs = base_secs
252 .saturating_add(pressure_bonus)
253 .saturating_add(depth_bonus)
254 .clamp(min_secs, max_secs);
255
256 std::time::Duration::from_secs(timeout_secs)
257 }
258}
259
260impl DbMode for Pgsql {
261 fn database_tables(&mut self) -> JsonValue {
262 let sql = "SHOW TABLES".to_string();
263 match self.sql(sql.as_str()) {
264 Ok(e) => {
265 let mut list = vec![];
266 for item in e.members() {
267 for (_, value) in item.entries() {
268 list.push(value.clone());
269 }
270 }
271 list.into()
272 }
273 Err(_) => {
274 array![]
275 }
276 }
277 }
278
279 fn database_create(&mut self, name: &str) -> bool {
280 let sql = format!("CREATE DATABASE {name}");
281
282 let (state, data) = self.execute(sql.as_str());
283 match state {
284 true => data.as_bool().unwrap_or(true),
285 false => {
286 error!("创建数据库失败: {data:?}");
287 false
288 }
289 }
290 }
291
292 fn truncate(&mut self, table: &str) -> bool {
293 let sql = format!("TRUNCATE TABLE {table}");
294 let (state, _) = self.execute(sql.as_str());
295 state
296 }
297}
298
299impl Mode for Pgsql {
300 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
301 let mut sql = String::new();
302 let mut comments = vec![];
303
304 if !options.table_unique.is_empty() {
305 let full_name = format!(
306 "{}_unique_{}",
307 options.table_name,
308 options.table_unique.join("_")
309 );
310 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
311 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
312 let unique = format!(
313 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
314 name,
315 options.table_name,
316 options.table_unique.join(",")
317 );
318 comments.push(unique);
319 }
320
321 for row in options.table_index.iter() {
322 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
323 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
324 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
325 let index = format!(
326 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
327 name,
328 options.table_name,
329 row.join(",")
330 );
331 comments.push(index);
332 }
333
334 for (name, field) in options.table_fields.entries_mut() {
335 field["table_name"] = options.table_name.clone().into();
336 let row = br_fields::field("pgsql", name, field.clone());
337 let (col_sql, meta) = if let Some(idx) = row.find("--") {
338 (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
339 } else {
340 (row.trim(), None)
341 };
342 if let Some(meta) = meta {
343 comments.push(format!(
344 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
345 options.table_name, name, meta
346 ));
347 }
348 sql = format!("{} {},\r\n", sql, col_sql);
349 }
350
351 let primary_key = format!(
352 "CONSTRAINT {}_{} PRIMARY KEY ({})",
353 options.table_name, options.table_key, options.table_key
354 );
355 let sql = format!(
356 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
357 options.table_name,
358 sql.trim_end_matches(",\r\n"),
359 primary_key
360 );
361 comments.insert(0, sql);
362
363 for (_name, field) in options.table_fields.entries() {
364 let _ = field["mode"].as_str();
365 }
366
367 if self.params.sql {
368 let info = comments.join("\r\n");
369 return JsonValue::from(info);
370 }
371 for comment in comments {
372 let (state, _) = self.execute(comment.as_str());
373 match state {
374 true => {}
375 false => {
376 return JsonValue::from(state);
377 }
378 }
379 }
380 JsonValue::from(true)
381 }
382
383 fn table_update(&mut self, options: TableOptions) -> JsonValue {
384 let cache_key = format!("{}{}", self.default, options.table_name);
386 let table_fields_guard = match TABLE_FIELDS.read() {
387 Ok(g) => g,
388 Err(e) => e.into_inner(),
389 };
390 if table_fields_guard.get(&cache_key).is_some() {
391 drop(table_fields_guard);
392 let mut table_fields_guard = match TABLE_FIELDS.write() {
393 Ok(g) => g,
394 Err(e) => e.into_inner(),
395 };
396 table_fields_guard.remove(&cache_key);
397 } else {
398 drop(table_fields_guard);
399 }
400 let fields_list = self.table_info(&options.table_name);
401 let mut put = vec![];
402 let mut add = vec![];
403 let mut del = vec![];
404 let mut comments = vec![];
405
406 for (key, _) in fields_list.entries() {
407 if options.table_fields[key].is_empty() {
408 del.push(key);
409 }
410 }
411 for (name, field) in options.table_fields.entries() {
412 if !fields_list[name].is_empty() {
413 let old_info = &fields_list[name];
414 let new_field_sql = br_fields::field("pgsql", name, field.clone());
415
416 let old_comment = old_info["comment"].as_str().unwrap_or("");
417 let old_type = old_info["type"].as_str().unwrap_or("");
418
419 let new_comment = if let Some(idx) = new_field_sql.find("--") {
420 new_field_sql[idx + 2..].trim()
421 } else {
422 ""
423 };
424
425 let comment_matches =
426 if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
427 let old_parts: Vec<&str> = old_comment.split('|').collect();
428 let new_parts: Vec<&str> = new_comment.split('|').collect();
429 if old_parts.len() >= 4 && new_parts.len() >= 4 {
430 old_parts[..4] == new_parts[..4]
431 } else {
432 old_comment == new_comment
433 }
434 } else if !old_comment.is_empty() && !new_comment.is_empty() {
435 let old_parts: Vec<&str> = old_comment.split('|').collect();
436 let new_parts: Vec<&str> = new_comment.split('|').collect();
437 if old_parts.len() >= 2
438 && new_parts.len() >= 2
439 && old_parts.len() == new_parts.len()
440 {
441 let old_filtered: Vec<&str> = old_parts
442 .iter()
443 .filter(|v| **v != "true" && **v != "false")
444 .copied()
445 .collect();
446 let new_filtered: Vec<&str> = new_parts
447 .iter()
448 .filter(|v| **v != "true" && **v != "false")
449 .copied()
450 .collect();
451 old_filtered == new_filtered
452 } else {
453 old_comment == new_comment
454 }
455 } else {
456 old_comment == new_comment
457 };
458
459 let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
460 let new_type = if sql_parts.len() > 1 {
461 sql_parts[1].to_lowercase()
462 } else {
463 String::new()
464 };
465
466 let type_matches = match old_type {
467 "integer" => {
468 new_type.contains("int")
469 && !new_type.contains("bigint")
470 && !new_type.contains("smallint")
471 }
472 "bigint" => new_type.contains("bigint"),
473 "smallint" => new_type.contains("smallint"),
474 "boolean" => new_type.contains("boolean"),
475 "text" => new_type.contains("text"),
476 "character varying" => {
477 if !new_type.contains("varchar") {
478 false
479 } else {
480 let old_len = old_info["max_length"].as_i64().unwrap_or(0);
481 let new_len = new_type
482 .trim_start_matches("varchar(")
483 .trim_end_matches(')')
484 .parse::<i64>()
485 .unwrap_or(0);
486 let matched = old_len == new_len || new_len == 0;
487 if !matched {
488 log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
489 }
490 old_len == new_len || new_len == 0
491 }
492 }
493 "character" => new_type.contains("char") && !new_type.contains("varchar"),
494 "numeric" => {
495 if !(new_type.contains("numeric") || new_type.contains("decimal")) {
496 false
497 } else {
498 let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
499 let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
500 let inner = new_type
501 .replace("numeric(", "")
502 .replace("decimal(", "")
503 .replace(')', "");
504 let parts: Vec<&str> = inner.split(',').collect();
505 let new_prec = parts
506 .first()
507 .and_then(|s| s.trim().parse::<i64>().ok())
508 .unwrap_or(0);
509 let new_scale = parts
510 .get(1)
511 .and_then(|s| s.trim().parse::<i64>().ok())
512 .unwrap_or(0);
513 old_prec == new_prec && old_scale == new_scale
514 }
515 }
516 "double precision" => {
517 new_type.contains("double") || new_type.contains("float8")
518 }
519 "real" => new_type.contains("real") || new_type.contains("float4"),
520 "timestamp without time zone" | "timestamp with time zone" => {
521 new_type.contains("timestamp")
522 }
523 "date" => new_type.contains("date") && !new_type.contains("timestamp"),
524 "time without time zone" | "time with time zone" => {
525 new_type.contains("time") && !new_type.contains("timestamp")
526 }
527 "json" | "jsonb" => new_type.contains("json"),
528 "uuid" => new_type.contains("uuid"),
529 "bytea" => new_type.contains("bytea"),
530 _ => old_type == new_type,
531 };
532
533 if type_matches && comment_matches {
534 continue;
535 }
536
537 log::debug!(
538 "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
539 options.table_name,
540 name,
541 type_matches,
542 old_type,
543 new_type,
544 comment_matches
545 );
546 put.push(name);
547 } else {
548 add.push(name);
549 }
550 }
551
552 for name in add.iter() {
553 let name = name.to_string();
554 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
555 let rows = row.split("--").collect::<Vec<&str>>();
556 comments.push(format!(
557 r#"ALTER TABLE "{}" add {};"#,
558 options.table_name,
559 rows[0].trim()
560 ));
561 if rows.len() > 1 {
562 comments.push(format!(
563 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
564 options.table_name,
565 name,
566 rows[1].trim()
567 ));
568 }
569 }
570 for name in del.iter() {
571 comments.push(format!(
572 "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
573 options.table_name, name
574 ));
575 }
576 for name in put.iter() {
577 let name = name.to_string();
578 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
579 let rows = row.split("--").collect::<Vec<&str>>();
580
581 let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
582
583 if sql[1].contains("BOOLEAN") {
584 let text = format!(
585 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
586 options.table_name, name
587 );
588 comments.push(text.clone());
589 let text = format!(
590 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
591 options.table_name, name, sql[1]
592 );
593 comments.push(text.clone());
594 } else {
595 let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
596 let new_type_lower = sql[1].to_lowercase();
597 let is_date_to_numeric = (old_col_type == "date"
598 || old_col_type.contains("timestamp"))
599 && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
600 if is_date_to_numeric {
601 comments.push(format!(
602 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
603 options.table_name, name
604 ));
605 comments.push(format!(
606 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
607 options.table_name, name, sql[1], name, name, name
608 ));
609 } else {
610 let text = format!(
611 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
612 options.table_name, name, sql[1]
613 );
614 comments.push(text.clone());
615 }
616 };
617
618 if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
619 let default_value = rows[0][default_pos + 9..].trim();
620 if !default_value.is_empty() {
621 comments.push(format!(
622 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
623 options.table_name, name, default_value
624 ));
625 }
626 }
627 let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
630 .as_str()
631 .unwrap_or("YES");
632 let old_is_required = old_is_nullable == "NO";
633
634 if old_is_required && name != options.table_key {
636 comments.push(format!(
637 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
638 options.table_name, name
639 ));
640 }
641
642 if rows.len() > 1 {
643 comments.push(format!(
644 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
645 options.table_name,
646 name,
647 rows[1].trim()
648 ));
649 }
650 }
651
652 let mut unique_new = vec![];
653 let mut index_new = vec![];
654 let mut primary_key = vec![];
655 let (_, index_list) = self.query(
656 format!(
657 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
658 options.table_name
659 )
660 .as_str(),
661 );
662 for item in index_list.members() {
663 let key_name = item["indexname"].as_str().unwrap_or("");
664 let indexdef = item["indexdef"].to_string();
665
666 if indexdef.contains(
667 format!(
668 "CREATE UNIQUE INDEX {}_{} ON",
669 options.table_name, options.table_key
670 )
671 .as_str(),
672 ) {
673 primary_key.push(key_name.to_string());
674 continue;
675 }
676 if indexdef.contains("CREATE UNIQUE INDEX") {
677 unique_new.push(key_name.to_string());
678 continue;
679 }
680 if indexdef.contains("CREATE INDEX") {
681 index_new.push(key_name.to_string());
682 continue;
683 }
684 }
685
686 if !options.table_unique.is_empty() {
687 let full_name = format!(
688 "{}_unique_{}",
689 options.table_name,
690 options.table_unique.join("_")
691 );
692 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
693 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
694 let unique = format!(
695 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
696 name,
697 options.table_name,
698 options.table_unique.join(",")
699 );
700 if !unique_new.contains(&name) {
701 comments.push(unique);
702 }
703 unique_new.retain(|x| *x != name);
704 }
705
706 for row in options.table_index.iter() {
707 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
708 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
709 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
710 let index = format!(
711 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
712 name,
713 options.table_name,
714 row.join(",")
715 );
716 if !index_new.contains(&name) {
717 comments.push(index);
718 }
719 index_new.retain(|x| *x != name);
720 }
721
722 for item in unique_new {
723 if item.ends_with("_pkey") {
724 continue;
725 }
726 if item.starts_with("unique_") {
727 comments.push(format!(
728 "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
729 options.table_name,
730 item.clone()
731 ));
732 } else {
733 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
734 }
735 }
736 for item in index_new {
737 if item.ends_with("_pkey") {
738 continue;
739 }
740 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
741 }
742
743 if self.params.sql {
744 return JsonValue::from(comments.join(""));
745 }
746
747 if comments.is_empty() {
748 return JsonValue::from(-1);
749 }
750
751 for item in comments.iter() {
752 let (state, res) = self.execute(item.as_str());
753 match state {
754 true => {}
755 false => {
756 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
757 return JsonValue::from(0);
758 }
759 }
760 }
761 JsonValue::from(1)
762 }
763
764 fn table_info(&mut self, table: &str) -> JsonValue {
765 let cache_key = format!("{}{}", self.default, table);
767 let table_fields_guard = match TABLE_FIELDS.read() {
768 Ok(g) => g,
769 Err(e) => e.into_inner(),
770 };
771 if let Some(cached) = table_fields_guard.get(&cache_key) {
772 return cached.clone();
773 }
774 drop(table_fields_guard);
775 let sql = format!(
776 "SELECT COL.COLUMN_NAME,
777 COL.DATA_TYPE,
778 COL.IS_NULLABLE,
779 COL.CHARACTER_MAXIMUM_LENGTH,
780 COL.NUMERIC_PRECISION,
781 COL.NUMERIC_SCALE,
782 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
783 LEFT JOIN
784 pg_catalog.pg_description DESCRIPTION
785 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
786 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
787 let (state, data) = self.query(sql.as_str());
788 let mut list = object! {};
789 if state {
790 for item in data.members() {
791 let mut row = object! {};
792 row["field"] = item["column_name"].clone();
793 row["comment"] = item["comment"].clone();
794 row["type"] = item["data_type"].clone();
795 row["is_nullable"] = item["is_nullable"].clone();
796 row["max_length"] = item["character_maximum_length"].clone();
797 row["numeric_precision"] = item["numeric_precision"].clone();
798 row["numeric_scale"] = item["numeric_scale"].clone();
799 if let Some(field_name) = row["field"].as_str() {
800 list[field_name] = row.clone();
801 }
802 }
803 let mut table_fields_guard = match TABLE_FIELDS.write() {
805 Ok(g) => g,
806 Err(e) => e.into_inner(),
807 };
808 table_fields_guard.insert(cache_key, list.clone());
809 list
810 } else {
811 list
812 }
813 }
814
815 fn table_is_exist(&mut self, name: &str) -> bool {
816 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
817 let (state, data) = self.query(sql.as_str());
818 match state {
819 true => {
820 for item in data.members() {
821 if item.has_key("exists") {
822 return item["exists"].as_bool().unwrap_or(false);
823 }
824 }
825 false
826 }
827 false => false,
828 }
829 }
830
831 fn table(&mut self, name: &str) -> &mut Pgsql {
832 self.params = Params::default(self.connection.mode.str().as_str());
833 let table_name = format!("{}{}", self.connection.prefix, name);
834 if !super::sql_safety::validate_table_name(&table_name) {
835 error!("Invalid table name: {}", name);
836 }
837 self.params.table = table_name.clone();
838 self.params.join_table = table_name;
839 self
840 }
841
842 fn change_table(&mut self, name: &str) -> &mut Self {
843 self.params.join_table = name.to_string();
844 self
845 }
846
847 fn autoinc(&mut self) -> &mut Self {
848 self.params.autoinc = true;
849 self
850 }
851
852 fn timestamps(&mut self) -> &mut Self {
853 self.params.timestamps = true;
854 self
855 }
856
857 fn fetch_sql(&mut self) -> &mut Self {
858 self.params.sql = true;
859 self
860 }
861
862 fn order(&mut self, field: &str, by: bool) -> &mut Self {
863 self.params.order[field] = {
864 if by {
865 "DESC"
866 } else {
867 "ASC"
868 }
869 }
870 .into();
871 self
872 }
873
874 fn group(&mut self, field: &str) -> &mut Self {
875 let fields: Vec<&str> = field.split(",").collect();
876 for field in fields.iter() {
877 let field = field.to_string();
878 self.params.group[field.as_str()] = field.clone().into();
879 self.params.fields[field.as_str()] = field.clone().into();
880 }
881 self
882 }
883
884 fn distinct(&mut self) -> &mut Self {
885 self.params.distinct = true;
886 self
887 }
888
889 fn json(&mut self, field: &str) -> &mut Self {
890 let list: Vec<&str> = field.split(",").collect();
891 for item in list.iter() {
892 self.params.json[item.to_string().as_str()] = item.to_string().into();
893 }
894 self
895 }
896
897 fn location(&mut self, field: &str) -> &mut Self {
898 let list: Vec<&str> = field.split(",").collect();
899 for item in list.iter() {
900 self.params.location[item.to_string().as_str()] = item.to_string().into();
901 }
902 self
903 }
904
905 fn field(&mut self, field: &str) -> &mut Self {
906 let list: Vec<&str> = field.split(",").collect();
907 let join_table = if self.params.join_table.is_empty() {
908 self.params.table.clone()
909 } else {
910 self.params.join_table.clone()
911 };
912 for item in list.iter() {
913 let lower = item.to_lowercase();
914 let is_expr = lower.contains("count(")
915 || lower.contains("sum(")
916 || lower.contains("avg(")
917 || lower.contains("max(")
918 || lower.contains("min(")
919 || lower.contains("case ");
920 if is_expr {
921 self.params.fields[item.to_string().as_str()] = (*item).into();
922 } else if item.contains(" as ") {
923 let text = item.split(" as ").collect::<Vec<&str>>();
924 self.params.fields[item.to_string().as_str()] =
925 format!("{}.{} as {}", join_table, text[0], text[1]).into();
926 } else {
927 self.params.fields[item.to_string().as_str()] =
928 format!("{join_table}.{item}").into();
929 }
930 }
931 self
932 }
933
934 fn field_raw(&mut self, expr: &str) -> &mut Self {
935 self.params.fields[expr] = expr.into();
936 self
937 }
938
939 fn hidden(&mut self, name: &str) -> &mut Self {
940 let hidden: Vec<&str> = name.split(",").collect();
941
942 let fields_list = self.table_info(self.params.clone().table.as_str());
943 let mut data = array![];
944 for item in fields_list.members() {
945 let _ = data.push(object! {
946 "name":item["field"].as_str().unwrap_or("")
947 });
948 }
949
950 for item in data.members() {
951 let name = item["name"].as_str().unwrap_or("");
952 if !hidden.contains(&name) {
953 self.params.fields[name] = name.into();
954 }
955 }
956 self
957 }
958
959 fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
960 for f in field.split('|') {
961 if !super::sql_safety::validate_field_name(f) {
962 error!("Invalid field name: {}", f);
963 }
964 }
965 if !super::sql_safety::validate_compare_orator(compare) {
966 error!("Invalid compare operator: {}", compare);
967 }
968 let join_table = if self.params.join_table.is_empty() {
969 self.params.table.clone()
970 } else {
971 self.params.join_table.clone()
972 };
973 if value.is_boolean() {
974 let bool_val = value.as_bool().unwrap_or(false);
975 self.params
976 .where_and
977 .push(format!("{join_table}.{field} {compare} {bool_val}"));
978 return self;
979 }
980 match compare {
981 "between" => {
982 self.params.where_and.push(format!(
983 "{}.{} between '{}' AND '{}'",
984 join_table, field, value[0], value[1]
985 ));
986 }
987 "set" => {
988 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
989 let mut wheredata = vec![];
990 for item in list.iter() {
991 wheredata.push(format!(
992 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
993 ));
994 }
995 self.params
996 .where_and
997 .push(format!("({})", wheredata.join(" or ")));
998 }
999 "notin" => {
1000 let mut text = String::new();
1001 for item in value.members() {
1002 text = format!("{text},'{item}'");
1003 }
1004 text = text.trim_start_matches(",").into();
1005 self.params
1006 .where_and
1007 .push(format!("{join_table}.{field} not in ({text})"));
1008 }
1009 "is" => {
1010 self.params
1011 .where_and
1012 .push(format!("{join_table}.{field} is {value}"));
1013 }
1014 "isnot" => {
1015 self.params
1016 .where_and
1017 .push(format!("{join_table}.{field} is not {value}"));
1018 }
1019 "notlike" => {
1020 self.params
1021 .where_and
1022 .push(format!("{join_table}.{field} not like '{value}'"));
1023 }
1024 "in" => {
1025 if value.is_array() && value.is_empty() {
1026 self.params.where_and.push("1=0".to_string());
1027 return self;
1028 }
1029 let mut text = String::new();
1030 if value.is_array() {
1031 for item in value.members() {
1032 text = format!("{text},'{item}'");
1033 }
1034 } else if value.is_null() {
1035 text = format!("{text},null");
1036 } else {
1037 let value = value.as_str().unwrap_or("");
1038
1039 let value: Vec<&str> = value.split(",").collect();
1040 for item in value.iter() {
1041 text = format!("{text},'{item}'");
1042 }
1043 }
1044 text = text.trim_start_matches(",").into();
1045
1046 self.params
1047 .where_and
1048 .push(format!("{join_table}.{field} {compare} ({text})"));
1049 }
1050 "json_contains" => {
1054 if value.is_array() {
1055 if value.is_empty() {
1056 self.params.where_and.push("1=0".to_string());
1057 } else {
1058 let mut parts = vec![];
1059 for item in value.members() {
1060 let escaped = super::sql_safety::escape_string(&item.to_string());
1061 parts.push(format!(
1062 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1063 escaped
1064 ));
1065 }
1066 self.params
1067 .where_and
1068 .push(format!("({})", parts.join(" OR ")));
1069 }
1070 } else {
1071 let escaped = super::sql_safety::escape_string(&value.to_string());
1072 self.params.where_and.push(format!(
1073 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1074 escaped
1075 ));
1076 }
1077 }
1078 _ => {
1079 self.params
1080 .where_and
1081 .push(format!("{join_table}.{field} {compare} '{value}'"));
1082 }
1083 }
1084 self
1085 }
1086
1087 fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1088 for f in field.split('|') {
1089 if !super::sql_safety::validate_field_name(f) {
1090 error!("Invalid field name: {}", f);
1091 }
1092 }
1093 if !super::sql_safety::validate_compare_orator(compare) {
1094 error!("Invalid compare operator: {}", compare);
1095 }
1096 let join_table = if self.params.join_table.is_empty() {
1097 self.params.table.clone()
1098 } else {
1099 self.params.join_table.clone()
1100 };
1101
1102 if value.is_boolean() {
1103 let bool_val = value.as_bool().unwrap_or(false);
1104 self.params
1105 .where_or
1106 .push(format!("{join_table}.{field} {compare} {bool_val}"));
1107 return self;
1108 }
1109
1110 match compare {
1111 "between" => {
1112 self.params.where_or.push(format!(
1113 "{}.{} between '{}' AND '{}'",
1114 join_table, field, value[0], value[1]
1115 ));
1116 }
1117 "set" => {
1118 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1119 let mut wheredata = vec![];
1120 for item in list.iter() {
1121 wheredata.push(format!(
1122 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1123 ));
1124 }
1125 self.params
1126 .where_or
1127 .push(format!("({})", wheredata.join(" or ")));
1128 }
1129 "notin" => {
1130 let mut text = String::new();
1131 for item in value.members() {
1132 text = format!("{text},'{item}'");
1133 }
1134 text = text.trim_start_matches(",").into();
1135 self.params
1136 .where_or
1137 .push(format!("{join_table}.{field} not in ({text})"));
1138 }
1139 "is" => {
1140 self.params
1141 .where_or
1142 .push(format!("{join_table}.{field} is {value}"));
1143 }
1144 "isnot" => {
1145 self.params
1146 .where_or
1147 .push(format!("{join_table}.{field} is not {value}"));
1148 }
1149 "in" => {
1150 if value.is_array() && value.is_empty() {
1151 self.params.where_or.push("1=0".to_string());
1152 return self;
1153 }
1154 let mut text = String::new();
1155 if value.is_array() {
1156 for item in value.members() {
1157 text = format!("{text},'{item}'");
1158 }
1159 } else {
1160 let value = value.as_str().unwrap_or("");
1161 let value: Vec<&str> = value.split(",").collect();
1162 for item in value.iter() {
1163 text = format!("{text},'{item}'");
1164 }
1165 }
1166 text = text.trim_start_matches(",").into();
1167 self.params
1168 .where_or
1169 .push(format!("{join_table}.{field} {compare} ({text})"));
1170 }
1171 "json_contains" => {
1175 if value.is_array() {
1176 if value.is_empty() {
1177 self.params.where_or.push("1=0".to_string());
1178 } else {
1179 let mut parts = vec![];
1180 for item in value.members() {
1181 let escaped = super::sql_safety::escape_string(&item.to_string());
1182 parts.push(format!(
1183 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1184 escaped
1185 ));
1186 }
1187 self.params
1188 .where_or
1189 .push(format!("({})", parts.join(" OR ")));
1190 }
1191 } else {
1192 let escaped = super::sql_safety::escape_string(&value.to_string());
1193 self.params.where_or.push(format!(
1194 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1195 escaped
1196 ));
1197 }
1198 }
1199 _ => {
1200 self.params
1201 .where_or
1202 .push(format!("{join_table}.{field} {compare} '{value}'"));
1203 }
1204 }
1205 self
1206 }
1207
1208 fn where_raw(&mut self, expr: &str) -> &mut Self {
1209 self.params.where_and.push(expr.to_string());
1210 self
1211 }
1212
1213 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1214 self.params
1215 .where_and
1216 .push(format!("\"{field}\" IN ({sub_sql})"));
1217 self
1218 }
1219
1220 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1221 self.params
1222 .where_and
1223 .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1224 self
1225 }
1226
1227 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1228 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1229 self
1230 }
1231
1232 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1233 self.params
1234 .where_and
1235 .push(format!("NOT EXISTS ({sub_sql})"));
1236 self
1237 }
1238
1239 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1240 self.params.where_column = format!(
1241 "{}.{} {} {}.{}",
1242 self.params.table, field_a, compare, self.params.table, field_b
1243 );
1244 self
1245 }
1246
1247 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1248 self.params
1249 .update_column
1250 .push(format!("{field_a} = {compare}"));
1251 self
1252 }
1253
1254 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1255 self.params.page = page;
1256 self.params.limit = limit;
1257 self
1258 }
1259
1260 fn limit(&mut self, count: i32) -> &mut Self {
1261 self.params.limit_only = count;
1262 self
1263 }
1264
1265 fn column(&mut self, field: &str) -> JsonValue {
1266 self.field(field);
1267 let sql = self.params.select_sql();
1268
1269 if self.params.sql {
1270 return JsonValue::from(sql);
1271 }
1272 let (state, data) = self.query(sql.as_str());
1273 match state {
1274 true => {
1275 let mut list = array![];
1276 for item in data.members() {
1277 if self.params.json[field].is_empty() {
1278 let _ = list.push(item[field].clone());
1279 } else {
1280 let data =
1281 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1282 let _ = list.push(data);
1283 }
1284 }
1285 list
1286 }
1287 false => {
1288 array![]
1289 }
1290 }
1291 }
1292
1293 fn count(&mut self) -> JsonValue {
1294 self.params.fields = json::object! {};
1295 self.params.fields["count"] = "count(*) as count".to_string().into();
1296 let sql = self.params.select_sql();
1297 if self.params.sql {
1298 return JsonValue::from(sql.clone());
1299 }
1300 let (state, data) = self.query(sql.as_str());
1301 if state {
1302 data[0]["count"].clone()
1303 } else {
1304 JsonValue::from(0)
1305 }
1306 }
1307
1308 fn max(&mut self, field: &str) -> JsonValue {
1309 self.params.fields[field] = format!("max({field}) as {field}").into();
1310 let sql = self.params.select_sql();
1311 if self.params.sql {
1312 return JsonValue::from(sql.clone());
1313 }
1314 let (state, data) = self.query(sql.as_str());
1315 if state {
1316 if data.len() > 1 {
1317 return data.clone();
1318 }
1319 data[0][field].clone()
1320 } else {
1321 JsonValue::from(0)
1322 }
1323 }
1324
1325 fn min(&mut self, field: &str) -> JsonValue {
1326 self.params.fields[field] = format!("min({field}) as {field}").into();
1327 let sql = self.params.select_sql();
1328 if self.params.sql {
1329 return JsonValue::from(sql.clone());
1330 }
1331 let (state, data) = self.query(sql.as_str());
1332 if state {
1333 if data.len() > 1 {
1334 return data;
1335 }
1336 data[0][field].clone()
1337 } else {
1338 JsonValue::from(0)
1339 }
1340 }
1341
1342 fn sum(&mut self, field: &str) -> JsonValue {
1343 self.params.fields[field] = format!("sum({field}) as {field}").into();
1344 let sql = self.params.select_sql();
1345 if self.params.sql {
1346 return JsonValue::from(sql.clone());
1347 }
1348 let (state, data) = self.query(sql.as_str());
1349 match state {
1350 true => {
1351 if data.len() > 1 {
1352 return data;
1353 }
1354 data[0][field].clone()
1355 }
1356 false => JsonValue::from(0),
1357 }
1358 }
1359
1360 fn avg(&mut self, field: &str) -> JsonValue {
1361 self.params.fields[field] = format!("avg({field}) as {field}").into();
1362 let sql = self.params.select_sql();
1363 if self.params.sql {
1364 return JsonValue::from(sql.clone());
1365 }
1366 let (state, data) = self.query(sql.as_str());
1367 if state {
1368 if data.len() > 1 {
1369 return data;
1370 }
1371 data[0][field].clone()
1372 } else {
1373 JsonValue::from(0)
1374 }
1375 }
1376
1377 fn having(&mut self, expr: &str) -> &mut Self {
1378 self.params.having.push(expr.to_string());
1379 self
1380 }
1381
1382 fn select(&mut self) -> JsonValue {
1383 let sql = self.params.select_sql();
1384 if self.params.sql {
1385 return JsonValue::from(sql.clone());
1386 }
1387 let (state, mut data) = self.query(sql.as_str());
1388 match state {
1389 true => {
1390 for (field, _) in self.params.json.entries() {
1391 for item in data.members_mut() {
1392 if !item[field].is_empty() {
1393 let json = item[field].to_string();
1394 item[field] = match json::parse(&json) {
1395 Ok(e) => e,
1396 Err(_) => JsonValue::from(json),
1397 };
1398 }
1399 }
1400 }
1401 data.clone()
1402 }
1403 false => array![],
1404 }
1405 }
1406
1407 fn find(&mut self) -> JsonValue {
1408 self.params.page = 1;
1409 self.params.limit = 1;
1410 let sql = self.params.select_sql();
1411 if self.params.sql {
1412 return JsonValue::from(sql.clone());
1413 }
1414 let (state, mut data) = self.query(sql.as_str());
1415 match state {
1416 true => {
1417 if data.is_empty() {
1418 return object! {};
1419 }
1420 for (field, _) in self.params.json.entries() {
1421 if !data[0][field].is_empty() {
1422 let json = data[0][field].to_string();
1423 let json = json::parse(&json).unwrap_or(array![]);
1424 data[0][field] = json;
1425 } else {
1426 data[0][field] = array![];
1427 }
1428 }
1429 data[0].clone()
1430 }
1431 false => {
1432 object! {}
1433 }
1434 }
1435 }
1436
1437 fn value(&mut self, field: &str) -> JsonValue {
1438 self.params.fields = object! {};
1439 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1440 self.params.page = 1;
1441 self.params.limit = 1;
1442 let sql = self.params.select_sql();
1443 if self.params.sql {
1444 return JsonValue::from(sql.clone());
1445 }
1446 let (state, mut data) = self.query(sql.as_str());
1447 match state {
1448 true => {
1449 for (field, _) in self.params.json.entries() {
1450 if !data[0][field].is_empty() {
1451 let json = data[0][field].to_string();
1452 let json = json::parse(&json).unwrap_or(array![]);
1453 data[0][field] = json;
1454 } else {
1455 data[0][field] = array![];
1456 }
1457 }
1458 data[0][field].clone()
1459 }
1460 false => {
1461 if self.connection.debug {
1462 info!("{data:?}");
1463 }
1464 JsonValue::Null
1465 }
1466 }
1467 }
1468
1469 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1470 let fields_list = self.table_info(&self.params.table.clone());
1471 let mut fields = vec![];
1472 let mut values = vec![];
1473 if !self.params.autoinc && data["id"].is_empty() {
1474 let thread_id = format!("{:?}", std::thread::current().id());
1475 let thread_num: u64 = thread_id
1476 .trim_start_matches("ThreadId(")
1477 .trim_end_matches(")")
1478 .parse()
1479 .unwrap_or(0);
1480 data["id"] = format!(
1481 "{:X}{:X}",
1482 Local::now().timestamp_nanos_opt().unwrap_or(0),
1483 thread_num
1484 )
1485 .into();
1486 }
1487 for (field, value) in data.entries() {
1488 fields.push(format!("\"{}\"", field));
1489
1490 if value.is_string() {
1491 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1492 continue;
1493 } else if value.is_array() {
1494 if self.params.json[field].is_empty() {
1495 let array = value
1496 .members()
1497 .map(|x| x.as_str().unwrap_or(""))
1498 .collect::<Vec<&str>>()
1499 .join(",");
1500 values.push(format!("'{}'", array.replace("'", "''")));
1501 } else {
1502 let json = value.to_string();
1503 let json = json.replace("'", "''");
1504 values.push(format!("'{json}'"));
1505 }
1506 continue;
1507 } else if value.is_object() {
1508 if self.params.json[field].is_empty() {
1509 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1510 } else {
1511 let json = value.to_string();
1512 let json = json.replace("'", "''");
1513 values.push(format!("'{json}'"));
1514 }
1515 continue;
1516 } else if value.is_number() {
1517 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1518 if col_type == "boolean" {
1519 let bool_val = value.as_i64().unwrap_or(0) != 0;
1520 values.push(format!("{bool_val}"));
1521 } else if col_type.contains("int") {
1522 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1523 } else {
1524 values.push(format!("{value}"));
1525 }
1526 continue;
1527 } else if value.is_boolean() || value.is_null() {
1528 values.push(format!("{value}"));
1529 continue;
1530 } else {
1531 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1532 continue;
1533 }
1534 }
1535 let fields = fields.join(",");
1536 let values = values.join(",");
1537
1538 let sql = format!(
1539 "INSERT INTO {} ({}) VALUES ({});",
1540 self.params.table, fields, values
1541 );
1542 if self.params.sql {
1543 return JsonValue::from(sql.clone());
1544 }
1545 let (state, ids) = self.execute(sql.as_str());
1546
1547 match state {
1548 true => match self.params.autoinc {
1549 true => ids.clone(),
1550 false => data["id"].clone(),
1551 },
1552 false => {
1553 let thread_id = format!("{:?}", thread::current().id());
1554 error!("添加失败: {thread_id} {ids:?} {sql}");
1555 JsonValue::from("")
1556 }
1557 }
1558 }
1559 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1560 let fields_list = self.table_info(&self.params.table.clone());
1561 let mut fields = String::new();
1562 if !self.params.autoinc && data[0]["id"].is_empty() {
1563 data[0]["id"] = "".into();
1564 }
1565 for (field, _) in data[0].entries() {
1566 fields = format!("{fields},\"{field}\"");
1567 }
1568 fields = fields.trim_start_matches(",").to_string();
1569
1570 let core_count = num_cpus::get();
1571 let mut p = pools::Pool::new(core_count * 4);
1572
1573 let autoinc = self.params.autoinc;
1574 for list in data.members() {
1575 let mut item = list.clone();
1576 let i = br_fields::str::Code::verification_code(3);
1577 let fields_list_new = fields_list.clone();
1578 p.execute(move |pcindex| {
1579 if !autoinc && item["id"].is_empty() {
1580 let id = format!(
1581 "{:X}{:X}{}",
1582 Local::now().timestamp_nanos_opt().unwrap_or(0),
1583 pcindex,
1584 i
1585 );
1586 item["id"] = id.into();
1587 }
1588 let mut values = "".to_string();
1589 for (field, value) in item.entries() {
1590 if value.is_string() {
1591 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1592 } else if value.is_number() {
1593 let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1594 if col_type == "boolean" {
1595 let bool_val = value.as_i64().unwrap_or(0) != 0;
1596 values = format!("{values},{bool_val}");
1597 } else if col_type.contains("int") {
1598 values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1599 } else {
1600 values = format!("{values},{value}");
1601 }
1602 } else if value.is_boolean() {
1603 values = format!("{values},{value}");
1604 continue;
1605 } else {
1606 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1607 }
1608 }
1609 values = format!("({})", values.trim_start_matches(","));
1610 array![item["id"].clone(), values]
1611 });
1612 }
1613 let (ids_list, mut values) = p.insert_all();
1614 values = values.trim_start_matches(",").to_string();
1615 let sql = format!(
1616 "INSERT INTO {} ({}) VALUES {};",
1617 self.params.table, fields, values
1618 );
1619
1620 if self.params.sql {
1621 return JsonValue::from(sql.clone());
1622 }
1623 let (state, data) = self.execute(sql.as_str());
1624 match state {
1625 true => match autoinc {
1626 true => data,
1627 false => JsonValue::from(ids_list),
1628 },
1629 false => {
1630 error!("insert_all: {data:?}");
1631
1632 array![]
1633 }
1634 }
1635 }
1636 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1637 let fields_list = self.table_info(&self.params.table.clone());
1638 let mut fields = vec![];
1639 let mut values = vec![];
1640 if !self.params.autoinc && data["id"].is_empty() {
1641 let thread_id = format!("{:?}", std::thread::current().id());
1642 let thread_num: u64 = thread_id
1643 .trim_start_matches("ThreadId(")
1644 .trim_end_matches(")")
1645 .parse()
1646 .unwrap_or(0);
1647 data["id"] = format!(
1648 "{:X}{:X}",
1649 Local::now().timestamp_nanos_opt().unwrap_or(0),
1650 thread_num
1651 )
1652 .into();
1653 }
1654 for (field, value) in data.entries() {
1655 fields.push(format!("\"{}\"", field));
1656
1657 if value.is_string() {
1658 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1659 continue;
1660 } else if value.is_array() {
1661 if self.params.json[field].is_empty() {
1662 let array = value
1663 .members()
1664 .map(|x| x.as_str().unwrap_or(""))
1665 .collect::<Vec<&str>>()
1666 .join(",");
1667 values.push(format!("'{}'", array.replace("'", "''")));
1668 } else {
1669 let json = value.to_string();
1670 let json = json.replace("'", "''");
1671 values.push(format!("'{json}'"));
1672 }
1673 continue;
1674 } else if value.is_object() {
1675 if self.params.json[field].is_empty() {
1676 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1677 } else {
1678 let json = value.to_string();
1679 let json = json.replace("'", "''");
1680 values.push(format!("'{json}'"));
1681 }
1682 continue;
1683 } else if value.is_number() {
1684 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1685 if col_type == "boolean" {
1686 let bool_val = value.as_i64().unwrap_or(0) != 0;
1687 values.push(format!("{bool_val}"));
1688 } else if col_type.contains("int") {
1689 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1690 } else {
1691 values.push(format!("{value}"));
1692 }
1693 continue;
1694 } else if value.is_boolean() || value.is_null() {
1695 values.push(format!("{value}"));
1696 continue;
1697 } else {
1698 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1699 continue;
1700 }
1701 }
1702
1703 let conflict_cols: Vec<String> = conflict_fields
1704 .iter()
1705 .map(|f| format!("\"{}\"", f))
1706 .collect();
1707
1708 let update_set: Vec<String> = fields
1709 .iter()
1710 .filter(|f| {
1711 let name = f.trim_matches('"');
1712 !conflict_fields.contains(&name) && name != "id"
1713 })
1714 .map(|f| format!("{f}=EXCLUDED.{f}"))
1715 .collect();
1716
1717 let fields_str = fields.join(",");
1718 let values_str = values.join(",");
1719
1720 let sql = format!(
1721 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1722 self.params.table,
1723 fields_str,
1724 values_str,
1725 conflict_cols.join(","),
1726 update_set.join(",")
1727 );
1728 if self.params.sql {
1729 return JsonValue::from(sql.clone());
1730 }
1731 let (state, result) = self.execute(sql.as_str());
1732 match state {
1733 true => match self.params.autoinc {
1734 true => result.clone(),
1735 false => data["id"].clone(),
1736 },
1737 false => {
1738 let thread_id = format!("{:?}", thread::current().id());
1739 error!("upsert失败: {thread_id} {result:?} {sql}");
1740 JsonValue::from("")
1741 }
1742 }
1743 }
1744 fn update(&mut self, data: JsonValue) -> JsonValue {
1745 let fields_list = self.table_info(&self.params.table.clone());
1746 let mut values = vec![];
1747 for (field, value) in data.entries() {
1748 if value.is_string() {
1749 values.push(format!(
1750 "\"{}\"='{}'",
1751 field,
1752 value.to_string().replace("'", "''")
1753 ));
1754 } else if value.is_number() {
1755 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1756 if col_type == "boolean" {
1757 let bool_val = value.as_i64().unwrap_or(0) != 0;
1758 values.push(format!("\"{field}\"= {bool_val}"));
1759 } else if col_type.contains("int") {
1760 values.push(format!(
1761 "\"{}\"= {}",
1762 field,
1763 value.as_f64().unwrap_or(0.0) as i64
1764 ));
1765 } else {
1766 values.push(format!("\"{field}\"= {value}"));
1767 }
1768 } else if value.is_array() {
1769 if self.params.json[field].is_empty() {
1770 let array = value
1771 .members()
1772 .map(|x| x.as_str().unwrap_or(""))
1773 .collect::<Vec<&str>>()
1774 .join(",");
1775 values.push(format!("\"{}\"='{}'", field, array.replace("'", "''")));
1776 } else {
1777 let json = value.to_string();
1778 let json = json.replace("'", "''");
1779 values.push(format!("\"{field}\"='{json}'"));
1780 }
1781 continue;
1782 } else if value.is_object() {
1783 if self.params.json[field].is_empty() {
1784 values.push(format!(
1785 "\"{}\"='{}'",
1786 field,
1787 value.to_string().replace("'", "''")
1788 ));
1789 } else {
1790 if value.is_empty() {
1791 values.push(format!("\"{field}\"=''"));
1792 continue;
1793 }
1794 let json = value.to_string();
1795 let json = json.replace("'", "''");
1796 values.push(format!("\"{field}\"='{json}'"));
1797 }
1798 continue;
1799 } else if value.is_boolean() || value.is_null() {
1800 values.push(format!("\"{field}\"= {value}"));
1801 } else {
1802 values.push(format!("\"{field}\"=\"{value}\""));
1803 }
1804 }
1805
1806 for (field, value) in self.params.inc_dec.entries() {
1807 values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1808 }
1809 if !self.params.update_column.is_empty() {
1810 values.extend(self.params.update_column.clone());
1811 }
1812 let values = values.join(",");
1813
1814 let sql = format!(
1815 "UPDATE {} SET {} {};",
1816 self.params.table.clone(),
1817 values,
1818 self.params.where_sql()
1819 );
1820 if self.params.sql {
1821 return JsonValue::from(sql.clone());
1822 }
1823 let (state, data) = self.execute(sql.as_str());
1824 if state {
1825 data
1826 } else {
1827 let thread_id = format!("{:?}", thread::current().id());
1828 error!("update: {thread_id} {data:?} {sql}");
1829 0.into()
1830 }
1831 }
1832 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1833 let fields_list = self.table_info(&self.params.table.clone());
1834 let mut values = vec![];
1835
1836 let mut ids = vec![];
1837 for (field, _) in data[0].entries() {
1838 if field == "id" {
1839 continue;
1840 }
1841 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1842 let mut fields = vec![];
1843 for row in data.members() {
1844 let value = row[field].clone();
1845 let id = row["id"].clone();
1846 ids.push(id.clone());
1847 if value.is_string() {
1848 fields.push(format!(
1849 "WHEN '{}' THEN '{}'",
1850 id,
1851 value.to_string().replace("'", "''")
1852 ));
1853 } else if value.is_array() || value.is_object() {
1854 if self.params.json[field].is_empty() {
1855 fields.push(format!(
1856 "WHEN '{}' THEN '{}'",
1857 id,
1858 value.to_string().replace("'", "''")
1859 ));
1860 } else {
1861 let json = value.to_string();
1862 let json = json.replace("'", "''");
1863 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1864 }
1865 continue;
1866 } else if value.is_number() {
1867 if col_type == "boolean" {
1868 let bool_val = value.as_i64().unwrap_or(0) != 0;
1869 fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1870 } else {
1871 fields.push(format!("WHEN '{id}' THEN {value}"));
1872 }
1873 } else if value.is_boolean() || value.is_null() {
1874 fields.push(format!("WHEN '{id}' THEN {value}"));
1875 } else {
1876 fields.push(format!(
1877 "WHEN '{}' THEN '{}'",
1878 id,
1879 value.to_string().replace("'", "''")
1880 ));
1881 }
1882 }
1883 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1884 }
1885 self.where_and("id", "in", ids.into());
1886 for (field, value) in self.params.inc_dec.entries() {
1887 values.push(format!("{} = {}", field, value.to_string().clone()));
1888 }
1889
1890 let values = values.join(",");
1891 let sql = format!(
1892 "UPDATE {} SET {} {} {};",
1893 self.params.table.clone(),
1894 values,
1895 self.params.where_sql(),
1896 self.params.page_limit_sql()
1897 );
1898 if self.params.sql {
1899 return JsonValue::from(sql.clone());
1900 }
1901 let (state, data) = self.execute(sql.as_str());
1902 if state {
1903 data
1904 } else {
1905 error!("update_all: {data:?}");
1906 JsonValue::from(0)
1907 }
1908 }
1909
1910 fn delete(&mut self) -> JsonValue {
1911 let sql = format!(
1912 "delete FROM {} {} {};",
1913 self.params.table.clone(),
1914 self.params.where_sql(),
1915 self.params.page_limit_sql()
1916 );
1917 if self.params.sql {
1918 return JsonValue::from(sql.clone());
1919 }
1920 let (state, data) = self.execute(sql.as_str());
1921 match state {
1922 true => data,
1923 false => {
1924 error!("delete 失败>>> {data:?}");
1925 JsonValue::from(0)
1926 }
1927 }
1928 }
1929
1930 fn transaction(&mut self) -> bool {
1931 let thread_id = format!("{:?}", thread::current().id());
1932 let key = format!("{}{}", self.default, thread_id);
1933
1934 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1935 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1936 PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1937 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1938 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1939 return true;
1940 }
1941
1942 let mut conn = None;
1944 for attempt in 0..3u8 {
1945 match self.client.get_connect_for_transaction() {
1946 Ok(mut c) => {
1947 if c.is_valid() {
1948 conn = Some(c);
1949 break;
1950 }
1951 warn!("事务连接无效(第{}次)", attempt + 1);
1952 self.client.release_transaction_conn();
1953 }
1954 Err(e) => {
1955 warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1956 }
1957 }
1958 if attempt < 2 {
1959 thread::sleep(std::time::Duration::from_millis(200));
1960 }
1961 }
1962 let mut conn = match conn {
1963 Some(c) => c,
1964 None => {
1965 error!("获取事务连接重试耗尽");
1966 return false;
1967 }
1968 };
1969
1970 if let Err(e) = conn.execute("START TRANSACTION") {
1971 error!("启动事务失败: {e}");
1972 self.client.release_transaction_conn();
1973 return false;
1974 }
1975
1976 PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1977 true
1978 }
1979 fn commit(&mut self) -> bool {
1980 let thread_id = format!("{:?}", thread::current().id());
1981 let key = format!("{}{}", self.default, thread_id);
1982
1983 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1984 error!("commit: 没有活跃的事务");
1985 return false;
1986 }
1987
1988 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1989 if depth > 1 {
1990 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1991 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1992 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &key);
1993 return true;
1994 }
1995
1996 let commit_result =
1997 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1998
1999 let success = match commit_result {
2000 Some(Ok(_)) => true,
2001 Some(Err(e)) => {
2002 error!("提交事务失败: {e}");
2003 false
2004 }
2005 None => {
2006 error!("提交事务失败: 未找到连接");
2007 false
2008 }
2009 };
2010
2011 if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &key) {
2012 self.client.release_transaction_conn_with_conn(conn);
2013 } else {
2014 self.client.release_transaction_conn();
2015 }
2016 success
2017 }
2018
2019 fn rollback(&mut self) -> bool {
2020 let thread_id = format!("{:?}", thread::current().id());
2021 let key = format!("{}{}", self.default, thread_id);
2022
2023 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
2024 error!("rollback: 没有活跃的事务");
2025 return false;
2026 }
2027
2028 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
2029 if depth > 1 {
2030 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
2031 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
2032 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &key);
2033 return true;
2034 }
2035
2036 let rollback_result =
2037 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
2038
2039 let success = match rollback_result {
2040 Some(Ok(_)) => true,
2041 Some(Err(e)) => {
2042 error!("回滚失败: {e}");
2043 false
2044 }
2045 None => {
2046 error!("回滚失败: 未找到连接");
2047 false
2048 }
2049 };
2050
2051 if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &key) {
2052 self.client.release_transaction_conn_with_conn(conn);
2053 } else {
2054 self.client.release_transaction_conn();
2055 }
2056 success
2057 }
2058
2059 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
2060 let (state, data) = self.query(sql);
2061 match state {
2062 true => Ok(data),
2063 false => Err(data.to_string()),
2064 }
2065 }
2066
2067 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2068 self.params = Params::default(self.connection.mode.str().as_str());
2069 let (state, data) = self.execute(sql);
2070 match state {
2071 true => Ok(data),
2072 false => Err(data.to_string()),
2073 }
2074 }
2075
2076 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2077 self.params.inc_dec[field] = format!("{field} + {num}").into();
2078 self
2079 }
2080
2081 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2082 self.params.inc_dec[field] = format!("{field} - {num}").into();
2083 self
2084 }
2085 fn buildsql(&mut self) -> String {
2086 self.fetch_sql();
2087 let sql = self.select().to_string();
2088 format!("( {} ) {}", sql, self.params.table)
2089 }
2090
2091 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2092 for field in fields {
2093 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2094 }
2095 self
2096 }
2097
2098 fn join(
2099 &mut self,
2100 main_table: &str,
2101 main_fields: &str,
2102 right_table: &str,
2103 right_fields: &str,
2104 ) -> &mut Self {
2105 let main_table = if main_table.is_empty() {
2106 self.params.table.clone()
2107 } else {
2108 main_table.to_string()
2109 };
2110 self.params.join_table = right_table.to_string();
2111 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2112 self
2113 }
2114
2115 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2116 let main_fields = if main_fields.is_empty() {
2117 "id"
2118 } else {
2119 main_fields
2120 };
2121 let second_fields = if second_fields.is_empty() {
2122 self.params.table.clone()
2123 } else {
2124 second_fields.to_string().clone()
2125 };
2126 let sec_table_name = format!("{}{}", table, "_2");
2127 let second_table = format!("{} {}", table, sec_table_name.clone());
2128 self.params.join_table = sec_table_name.clone();
2129 self.params.join.push(format!(
2130 " INNER JOIN {} ON {}.{} = {}.{}",
2131 second_table, self.params.table, main_fields, sec_table_name, second_fields
2132 ));
2133 self
2134 }
2135
2136 fn join_right(
2137 &mut self,
2138 main_table: &str,
2139 main_fields: &str,
2140 right_table: &str,
2141 right_fields: &str,
2142 ) -> &mut Self {
2143 let main_table = if main_table.is_empty() {
2144 self.params.table.clone()
2145 } else {
2146 main_table.to_string()
2147 };
2148 self.params.join_table = right_table.to_string();
2149 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2150 self
2151 }
2152
2153 fn join_full(
2154 &mut self,
2155 main_table: &str,
2156 main_fields: &str,
2157 right_table: &str,
2158 right_fields: &str,
2159 ) -> &mut Self {
2160 let main_table = if main_table.is_empty() {
2161 self.params.table.clone()
2162 } else {
2163 main_table.to_string()
2164 };
2165 self.params.join_table = right_table.to_string();
2166 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2167 self
2168 }
2169
2170 fn union(&mut self, sub_sql: &str) -> &mut Self {
2171 self.params.unions.push(format!("UNION {sub_sql}"));
2172 self
2173 }
2174
2175 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2176 self.params.unions.push(format!("UNION ALL {sub_sql}"));
2177 self
2178 }
2179
2180 fn lock_for_update(&mut self) -> &mut Self {
2181 self.params.lock_mode = "FOR UPDATE".to_string();
2182 self
2183 }
2184
2185 fn lock_for_share(&mut self) -> &mut Self {
2186 self.params.lock_mode = "FOR SHARE".to_string();
2187 self
2188 }
2189}