1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use br_pgsql::PgsqlError;
7use chrono::Local;
8use json::{array, object, JsonValue};
9use log::{error, info, warn};
10use std::thread;
11#[derive(Clone)]
12pub struct Pgsql {
13 pub connection: Connection,
15 pub default: String,
17 pub params: Params,
18 pub client: Pools,
19}
20
21impl Pgsql {
22 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
23 let port = connection
24 .hostport
25 .parse::<i32>()
26 .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
27
28 let cp_connection = connection.clone();
29 let config = object! {
30 debug: cp_connection.debug,
31 username: cp_connection.username,
32 userpass: cp_connection.userpass,
33 database: cp_connection.database,
34 hostname: cp_connection.hostname,
35 hostport: port,
36 charset: cp_connection.charset.str(),
37 };
38 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
39
40 let pools = pgsql.pools()?;
41 Ok(Self {
42 connection,
43 default: default.clone(),
44 params: Params::default("pgsql"),
45 client: pools,
46 })
47 }
48
49 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
50 let thread_id = format!("{:?}", thread::current().id());
51 let key = format!("{}{}", self.default, thread_id);
52
53 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
54 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
55
56 match result {
57 Some(Ok(e)) => {
58 if self.connection.debug {
59 info!("查询成功: {} {}", thread_id.clone(), sql);
60 }
61 (true, e.rows)
62 }
63 Some(Err(e)) => {
64 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
65 (false, JsonValue::from(e.to_string()))
66 }
67 None => {
68 error!("事务查询失败: 未找到事务连接 {thread_id}");
69 (false, JsonValue::from("未找到事务连接"))
70 }
71 }
72 } else {
73 const RETRY_BACKOFF_MS: [u64; 3] = [200, 500, 1000];
74 let mut last_err = String::new();
75 for attempt in 0..3u8 {
76 let mut guard = match self.client.get_guard() {
77 Ok(g) => g,
78 Err(e) => {
79 if attempt == 2 {
80 error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
81 return (false, JsonValue::from(e.to_string()));
82 }
83 let backoff = RETRY_BACKOFF_MS[attempt as usize];
84 warn!(
85 "非事务查询 get_guard 失败(第{}次, {}ms后重试): {thread_id} {e}",
86 attempt + 1,
87 backoff
88 );
89 thread::sleep(std::time::Duration::from_millis(backoff));
90 continue;
91 }
92 };
93 match guard.conn().query(sql) {
94 Ok(e) => {
95 if self.connection.debug {
96 info!("查询成功: {} {}", thread_id.clone(), sql);
97 }
98 return (true, e.rows);
99 }
100 Err(ref e) if Self::is_retriable_error(e) => {
101 last_err = e.to_string();
102 guard.discard();
103 if attempt < 2 {
104 let backoff = RETRY_BACKOFF_MS[attempt as usize];
105 warn!(
106 "非事务查询连接断开(第{}次, {}ms后重试): {thread_id} {e}",
107 attempt + 1,
108 backoff
109 );
110 thread::sleep(std::time::Duration::from_millis(backoff));
111 } else {
112 warn!(
113 "非事务查询连接断开(第{}次, 不再重试): {thread_id} {e}",
114 attempt + 1
115 );
116 }
117 }
118 Err(e) => {
119 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
120 return (false, JsonValue::from(e.to_string()));
121 }
122 }
123 }
124 error!("非事务查询重试耗尽: 线程ID: {thread_id} 错误: {last_err} SQL语句: [{sql}]");
125 (false, JsonValue::from(last_err))
126 }
127 }
128 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
129 let thread_id = format!("{:?}", thread::current().id());
130 let key = format!("{}{}", self.default, thread_id);
131
132 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
133 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
134
135 match result {
136 Some(Ok(e)) => {
137 if self.connection.debug {
138 info!("提交成功: {} {}", thread_id.clone(), sql);
139 }
140 if sql.contains("INSERT") {
141 (true, e.rows)
142 } else {
143 (true, e.affect_count.into())
144 }
145 }
146 Some(Err(e)) => {
147 error!("事务提交失败: {thread_id} {e}");
148 (false, JsonValue::from(e.to_string()))
149 }
150 None => {
151 error!("事务执行失败: 未找到事务连接 {thread_id}");
152 (false, JsonValue::from("未找到事务连接"))
153 }
154 }
155 } else {
156 const RETRY_BACKOFF_MS: [u64; 3] = [200, 500, 1000];
160 let mut last_err = String::new();
161 for attempt in 0..3u8 {
162 let mut guard = match self.client.get_guard() {
163 Ok(g) => g,
164 Err(e) => {
165 if attempt == 2 {
166 error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
167 return (false, JsonValue::from(e.to_string()));
168 }
169 let backoff = RETRY_BACKOFF_MS[attempt as usize];
170 warn!(
171 "非事务执行 get_guard 失败(第{}次, {}ms后重试): {thread_id} {e}",
172 attempt + 1,
173 backoff
174 );
175 thread::sleep(std::time::Duration::from_millis(backoff));
176 continue;
177 }
178 };
179 match guard.conn().execute(sql) {
180 Ok(e) => {
181 if self.connection.debug {
182 info!("提交成功: {} {}", thread_id.clone(), sql);
183 }
184 if sql.contains("INSERT") {
185 return (true, e.rows);
186 } else {
187 return (true, e.affect_count.into());
188 }
189 }
190 Err(ref e) if Self::is_retriable_error(e) => {
191 last_err = e.to_string();
192 guard.discard();
193 if attempt < 2 {
194 let backoff = RETRY_BACKOFF_MS[attempt as usize];
195 warn!(
196 "非事务执行连接断开(第{}次, {}ms后重试): {thread_id} {e}",
197 attempt + 1,
198 backoff
199 );
200 thread::sleep(std::time::Duration::from_millis(backoff));
201 } else {
202 warn!(
203 "非事务执行连接断开(第{}次, 不再重试): {thread_id} {e}",
204 attempt + 1
205 );
206 }
207 }
208 Err(e) => {
209 error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
210 return (false, JsonValue::from(e.to_string()));
211 }
212 }
213 }
214 error!("非事务执行重试耗尽: 线程ID: {thread_id} 错误: {last_err} SQL语句: [{sql}]");
215 (false, JsonValue::from(last_err))
216 }
217 }
218
219 fn is_retriable_error(e: &PgsqlError) -> bool {
221 matches!(
222 e,
223 PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
224 )
225 }
226}
227
228impl DbMode for Pgsql {
229 fn database_tables(&mut self) -> JsonValue {
230 let sql = "SHOW TABLES".to_string();
231 match self.sql(sql.as_str()) {
232 Ok(e) => {
233 let mut list = vec![];
234 for item in e.members() {
235 for (_, value) in item.entries() {
236 list.push(value.clone());
237 }
238 }
239 list.into()
240 }
241 Err(_) => {
242 array![]
243 }
244 }
245 }
246
247 fn database_create(&mut self, name: &str) -> bool {
248 let sql = format!("CREATE DATABASE {name}");
249
250 let (state, data) = self.execute(sql.as_str());
251 match state {
252 true => data.as_bool().unwrap_or(true),
253 false => {
254 error!("创建数据库失败: {data:?}");
255 false
256 }
257 }
258 }
259
260 fn truncate(&mut self, table: &str) -> bool {
261 let sql = format!("TRUNCATE TABLE {table}");
262 let (state, _) = self.execute(sql.as_str());
263 state
264 }
265}
266
267impl Mode for Pgsql {
268 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
269 let mut sql = String::new();
270 let mut comments = vec![];
271
272 if !options.table_unique.is_empty() {
273 let full_name = format!(
274 "{}_unique_{}",
275 options.table_name,
276 options.table_unique.join("_")
277 );
278 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
279 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
280 let unique = format!(
281 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
282 name,
283 options.table_name,
284 options.table_unique.join(",")
285 );
286 comments.push(unique);
287 }
288
289 for row in options.table_index.iter() {
290 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
291 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
292 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
293 let index = format!(
294 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
295 name,
296 options.table_name,
297 row.join(",")
298 );
299 comments.push(index);
300 }
301
302 for (name, field) in options.table_fields.entries_mut() {
303 field["table_name"] = options.table_name.clone().into();
304 let row = br_fields::field("pgsql", name, field.clone());
305 let (col_sql, meta) = if let Some(idx) = row.find("--") {
306 (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
307 } else {
308 (row.trim(), None)
309 };
310 if let Some(meta) = meta {
311 comments.push(format!(
312 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
313 options.table_name, name, meta
314 ));
315 }
316 sql = format!("{} {},\r\n", sql, col_sql);
317 }
318
319 let primary_key = format!(
320 "CONSTRAINT {}_{} PRIMARY KEY ({})",
321 options.table_name, options.table_key, options.table_key
322 );
323 let sql = format!(
324 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
325 options.table_name,
326 sql.trim_end_matches(",\r\n"),
327 primary_key
328 );
329 comments.insert(0, sql);
330
331 for (_name, field) in options.table_fields.entries() {
332 let _ = field["mode"].as_str();
333 }
334
335 if self.params.sql {
336 let info = comments.join("\r\n");
337 return JsonValue::from(info);
338 }
339 for comment in comments {
340 let (state, _) = self.execute(comment.as_str());
341 match state {
342 true => {}
343 false => {
344 return JsonValue::from(state);
345 }
346 }
347 }
348 JsonValue::from(true)
349 }
350
351 fn table_update(&mut self, options: TableOptions) -> JsonValue {
352 let fields_list = self.table_info(&options.table_name);
353 let mut put = vec![];
354 let mut add = vec![];
355 let mut del = vec![];
356 let mut comments = vec![];
357
358 for (key, _) in fields_list.entries() {
359 if options.table_fields[key].is_empty() {
360 del.push(key);
361 }
362 }
363 for (name, field) in options.table_fields.entries() {
364 if !fields_list[name].is_empty() {
365 let old_info = &fields_list[name];
366 let new_field_sql = br_fields::field("pgsql", name, field.clone());
367
368 let old_comment = old_info["comment"].as_str().unwrap_or("");
369 let old_type = old_info["type"].as_str().unwrap_or("");
370
371 let new_comment = if let Some(idx) = new_field_sql.find("--") {
372 new_field_sql[idx + 2..].trim()
373 } else {
374 ""
375 };
376
377 let comment_matches =
378 if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
379 let old_parts: Vec<&str> = old_comment.split('|').collect();
380 let new_parts: Vec<&str> = new_comment.split('|').collect();
381 if old_parts.len() >= 4 && new_parts.len() >= 4 {
382 old_parts[..4] == new_parts[..4]
383 } else {
384 old_comment == new_comment
385 }
386 } else if !old_comment.is_empty() && !new_comment.is_empty() {
387 let old_parts: Vec<&str> = old_comment.split('|').collect();
388 let new_parts: Vec<&str> = new_comment.split('|').collect();
389 if old_parts.len() >= 2
390 && new_parts.len() >= 2
391 && old_parts.len() == new_parts.len()
392 {
393 let old_filtered: Vec<&str> = old_parts
394 .iter()
395 .filter(|v| **v != "true" && **v != "false")
396 .copied()
397 .collect();
398 let new_filtered: Vec<&str> = new_parts
399 .iter()
400 .filter(|v| **v != "true" && **v != "false")
401 .copied()
402 .collect();
403 old_filtered == new_filtered
404 } else {
405 old_comment == new_comment
406 }
407 } else {
408 old_comment == new_comment
409 };
410
411 let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
412 let new_type = if sql_parts.len() > 1 {
413 sql_parts[1].to_lowercase()
414 } else {
415 String::new()
416 };
417
418 let type_matches = match old_type {
419 "integer" => {
420 new_type.contains("int")
421 && !new_type.contains("bigint")
422 && !new_type.contains("smallint")
423 }
424 "bigint" => new_type.contains("bigint"),
425 "smallint" => new_type.contains("smallint"),
426 "boolean" => new_type.contains("boolean"),
427 "text" => new_type.contains("text"),
428 "character varying" => new_type.contains("varchar"),
429 "character" => new_type.contains("char") && !new_type.contains("varchar"),
430 "numeric" => new_type.contains("numeric") || new_type.contains("decimal"),
431 "double precision" => {
432 new_type.contains("double") || new_type.contains("float8")
433 }
434 "real" => new_type.contains("real") || new_type.contains("float4"),
435 "timestamp without time zone" | "timestamp with time zone" => {
436 new_type.contains("timestamp")
437 }
438 "date" => new_type.contains("date") && !new_type.contains("timestamp"),
439 "time without time zone" | "time with time zone" => {
440 new_type.contains("time") && !new_type.contains("timestamp")
441 }
442 "json" | "jsonb" => new_type.contains("json"),
443 "uuid" => new_type.contains("uuid"),
444 "bytea" => new_type.contains("bytea"),
445 _ => old_type == new_type,
446 };
447
448 if type_matches && comment_matches {
449 continue;
450 }
451
452 log::debug!(
453 "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
454 options.table_name,
455 name,
456 type_matches,
457 old_type,
458 new_type,
459 comment_matches
460 );
461 put.push(name);
462 } else {
463 add.push(name);
464 }
465 }
466
467 for name in add.iter() {
468 let name = name.to_string();
469 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
470 let rows = row.split("--").collect::<Vec<&str>>();
471 comments.push(format!(
472 r#"ALTER TABLE "{}" add {};"#,
473 options.table_name,
474 rows[0].trim()
475 ));
476 if rows.len() > 1 {
477 comments.push(format!(
478 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
479 options.table_name,
480 name,
481 rows[1].trim()
482 ));
483 }
484 }
485 for name in del.iter() {
486 comments.push(format!(
487 "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
488 options.table_name, name
489 ));
490 }
491 for name in put.iter() {
492 let name = name.to_string();
493 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
494 let rows = row.split("--").collect::<Vec<&str>>();
495
496 let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
497
498 if sql[1].contains("BOOLEAN") {
499 let text = format!(
500 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
501 options.table_name, name
502 );
503 comments.push(text.clone());
504 let text = format!(
505 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
506 options.table_name, name, sql[1]
507 );
508 comments.push(text.clone());
509 } else {
510 let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
511 let new_type_lower = sql[1].to_lowercase();
512 let is_date_to_numeric = (old_col_type == "date"
513 || old_col_type.contains("timestamp"))
514 && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
515 if is_date_to_numeric {
516 comments.push(format!(
517 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
518 options.table_name, name
519 ));
520 comments.push(format!(
521 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
522 options.table_name, name, sql[1], name, name, name
523 ));
524 } else {
525 let text = format!(
526 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
527 options.table_name, name, sql[1]
528 );
529 comments.push(text.clone());
530 }
531 };
532
533 if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
534 let default_value = rows[0][default_pos + 9..].trim();
535 if !default_value.is_empty() {
536 comments.push(format!(
537 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
538 options.table_name, name, default_value
539 ));
540 }
541 }
542 let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
545 .as_str()
546 .unwrap_or("YES");
547 let old_is_required = old_is_nullable == "NO";
548
549 if old_is_required && name != options.table_key {
551 comments.push(format!(
552 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
553 options.table_name, name
554 ));
555 }
556
557 if rows.len() > 1 {
558 comments.push(format!(
559 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
560 options.table_name,
561 name,
562 rows[1].trim()
563 ));
564 }
565 }
566
567 let mut unique_new = vec![];
568 let mut index_new = vec![];
569 let mut primary_key = vec![];
570 let (_, index_list) = self.query(
571 format!(
572 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
573 options.table_name
574 )
575 .as_str(),
576 );
577 for item in index_list.members() {
578 let key_name = item["indexname"].as_str().unwrap_or("");
579 let indexdef = item["indexdef"].to_string();
580
581 if indexdef.contains(
582 format!(
583 "CREATE UNIQUE INDEX {}_{} ON",
584 options.table_name, options.table_key
585 )
586 .as_str(),
587 ) {
588 primary_key.push(key_name.to_string());
589 continue;
590 }
591 if indexdef.contains("CREATE UNIQUE INDEX") {
592 unique_new.push(key_name.to_string());
593 continue;
594 }
595 if indexdef.contains("CREATE INDEX") {
596 index_new.push(key_name.to_string());
597 continue;
598 }
599 }
600
601 if !options.table_unique.is_empty() {
602 let full_name = format!(
603 "{}_unique_{}",
604 options.table_name,
605 options.table_unique.join("_")
606 );
607 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
608 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
609 let unique = format!(
610 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
611 name,
612 options.table_name,
613 options.table_unique.join(",")
614 );
615 if !unique_new.contains(&name) {
616 comments.push(unique);
617 }
618 unique_new.retain(|x| *x != name);
619 }
620
621 for row in options.table_index.iter() {
622 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
623 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
624 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
625 let index = format!(
626 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
627 name,
628 options.table_name,
629 row.join(",")
630 );
631 if !index_new.contains(&name) {
632 comments.push(index);
633 }
634 index_new.retain(|x| *x != name);
635 }
636
637 for item in unique_new {
638 if item.ends_with("_pkey") {
639 continue;
640 }
641 if item.starts_with("unique_") {
642 comments.push(format!(
643 "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
644 options.table_name,
645 item.clone()
646 ));
647 } else {
648 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
649 }
650 }
651 for item in index_new {
652 if item.ends_with("_pkey") {
653 continue;
654 }
655 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
656 }
657
658 if self.params.sql {
659 return JsonValue::from(comments.join(""));
660 }
661
662 if comments.is_empty() {
663 return JsonValue::from(-1);
664 }
665
666 for item in comments.iter() {
667 let (state, res) = self.execute(item.as_str());
668 match state {
669 true => {}
670 false => {
671 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
672 return JsonValue::from(0);
673 }
674 }
675 }
676 JsonValue::from(1)
677 }
678
679 fn table_info(&mut self, table: &str) -> JsonValue {
680 let sql = format!(
681 "SELECT COL.COLUMN_NAME,
682 COL.DATA_TYPE,
683 COL.IS_NULLABLE,
684 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
685 LEFT JOIN
686 pg_catalog.pg_description DESCRIPTION
687 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
688 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
689 let (state, data) = self.query(sql.as_str());
690 let mut list = object! {};
691 if state {
692 for item in data.members() {
693 let mut row = object! {};
694 row["field"] = item["column_name"].clone();
695 row["comment"] = item["comment"].clone();
696 row["type"] = item["data_type"].clone();
697 row["is_nullable"] = item["is_nullable"].clone();
698 if let Some(field_name) = row["field"].as_str() {
699 list[field_name] = row.clone();
700 }
701 }
702 list
703 } else {
704 list
705 }
706 }
707
708 fn table_is_exist(&mut self, name: &str) -> bool {
709 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
710 let (state, data) = self.query(sql.as_str());
711 match state {
712 true => {
713 for item in data.members() {
714 if item.has_key("exists") {
715 return item["exists"].as_bool().unwrap_or(false);
716 }
717 }
718 false
719 }
720 false => false,
721 }
722 }
723
724 fn table(&mut self, name: &str) -> &mut Pgsql {
725 self.params = Params::default(self.connection.mode.str().as_str());
726 let table_name = format!("{}{}", self.connection.prefix, name);
727 if !super::sql_safety::validate_table_name(&table_name) {
728 error!("Invalid table name: {}", name);
729 }
730 self.params.table = table_name.clone();
731 self.params.join_table = table_name;
732 self
733 }
734
735 fn change_table(&mut self, name: &str) -> &mut Self {
736 self.params.join_table = name.to_string();
737 self
738 }
739
740 fn autoinc(&mut self) -> &mut Self {
741 self.params.autoinc = true;
742 self
743 }
744
745 fn timestamps(&mut self) -> &mut Self {
746 self.params.timestamps = true;
747 self
748 }
749
750 fn fetch_sql(&mut self) -> &mut Self {
751 self.params.sql = true;
752 self
753 }
754
755 fn order(&mut self, field: &str, by: bool) -> &mut Self {
756 self.params.order[field] = {
757 if by {
758 "DESC"
759 } else {
760 "ASC"
761 }
762 }
763 .into();
764 self
765 }
766
767 fn group(&mut self, field: &str) -> &mut Self {
768 let fields: Vec<&str> = field.split(",").collect();
769 for field in fields.iter() {
770 let field = field.to_string();
771 self.params.group[field.as_str()] = field.clone().into();
772 self.params.fields[field.as_str()] = field.clone().into();
773 }
774 self
775 }
776
777 fn distinct(&mut self) -> &mut Self {
778 self.params.distinct = true;
779 self
780 }
781
782 fn json(&mut self, field: &str) -> &mut Self {
783 let list: Vec<&str> = field.split(",").collect();
784 for item in list.iter() {
785 self.params.json[item.to_string().as_str()] = item.to_string().into();
786 }
787 self
788 }
789
790 fn location(&mut self, field: &str) -> &mut Self {
791 let list: Vec<&str> = field.split(",").collect();
792 for item in list.iter() {
793 self.params.location[item.to_string().as_str()] = item.to_string().into();
794 }
795 self
796 }
797
798 fn field(&mut self, field: &str) -> &mut Self {
799 let list: Vec<&str> = field.split(",").collect();
800 let join_table = if self.params.join_table.is_empty() {
801 self.params.table.clone()
802 } else {
803 self.params.join_table.clone()
804 };
805 for item in list.iter() {
806 let lower = item.to_lowercase();
807 let is_expr = lower.contains("count(")
808 || lower.contains("sum(")
809 || lower.contains("avg(")
810 || lower.contains("max(")
811 || lower.contains("min(")
812 || lower.contains("case ");
813 if is_expr {
814 self.params.fields[item.to_string().as_str()] = (*item).into();
815 } else if item.contains(" as ") {
816 let text = item.split(" as ").collect::<Vec<&str>>();
817 self.params.fields[item.to_string().as_str()] =
818 format!("{}.{} as {}", join_table, text[0], text[1]).into();
819 } else {
820 self.params.fields[item.to_string().as_str()] =
821 format!("{join_table}.{item}").into();
822 }
823 }
824 self
825 }
826
827 fn field_raw(&mut self, expr: &str) -> &mut Self {
828 self.params.fields[expr] = expr.into();
829 self
830 }
831
832 fn hidden(&mut self, name: &str) -> &mut Self {
833 let hidden: Vec<&str> = name.split(",").collect();
834
835 let fields_list = self.table_info(self.params.clone().table.as_str());
836 let mut data = array![];
837 for item in fields_list.members() {
838 let _ = data.push(object! {
839 "name":item["field"].as_str().unwrap_or("")
840 });
841 }
842
843 for item in data.members() {
844 let name = item["name"].as_str().unwrap_or("");
845 if !hidden.contains(&name) {
846 self.params.fields[name] = name.into();
847 }
848 }
849 self
850 }
851
852 fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
853 for f in field.split('|') {
854 if !super::sql_safety::validate_field_name(f) {
855 error!("Invalid field name: {}", f);
856 }
857 }
858 if !super::sql_safety::validate_compare_orator(compare) {
859 error!("Invalid compare operator: {}", compare);
860 }
861 let join_table = if self.params.join_table.is_empty() {
862 self.params.table.clone()
863 } else {
864 self.params.join_table.clone()
865 };
866 if value.is_boolean() {
867 let bool_val = value.as_bool().unwrap_or(false);
868 self.params
869 .where_and
870 .push(format!("{join_table}.{field} {compare} {bool_val}"));
871 return self;
872 }
873 match compare {
874 "between" => {
875 self.params.where_and.push(format!(
876 "{}.{} between '{}' AND '{}'",
877 join_table, field, value[0], value[1]
878 ));
879 }
880 "set" => {
881 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
882 let mut wheredata = vec![];
883 for item in list.iter() {
884 wheredata.push(format!(
885 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
886 ));
887 }
888 self.params
889 .where_and
890 .push(format!("({})", wheredata.join(" or ")));
891 }
892 "notin" => {
893 let mut text = String::new();
894 for item in value.members() {
895 text = format!("{text},'{item}'");
896 }
897 text = text.trim_start_matches(",").into();
898 self.params
899 .where_and
900 .push(format!("{join_table}.{field} not in ({text})"));
901 }
902 "is" => {
903 self.params
904 .where_and
905 .push(format!("{join_table}.{field} is {value}"));
906 }
907 "isnot" => {
908 self.params
909 .where_and
910 .push(format!("{join_table}.{field} is not {value}"));
911 }
912 "notlike" => {
913 self.params
914 .where_and
915 .push(format!("{join_table}.{field} not like '{value}'"));
916 }
917 "in" => {
918 if value.is_array() && value.is_empty() {
919 self.params.where_and.push("1=0".to_string());
920 return self;
921 }
922 let mut text = String::new();
923 if value.is_array() {
924 for item in value.members() {
925 text = format!("{text},'{item}'");
926 }
927 } else if value.is_null() {
928 text = format!("{text},null");
929 } else {
930 let value = value.as_str().unwrap_or("");
931
932 let value: Vec<&str> = value.split(",").collect();
933 for item in value.iter() {
934 text = format!("{text},'{item}'");
935 }
936 }
937 text = text.trim_start_matches(",").into();
938
939 self.params
940 .where_and
941 .push(format!("{join_table}.{field} {compare} ({text})"));
942 }
943 "json_contains" => {
947 if value.is_array() {
948 if value.is_empty() {
949 self.params.where_and.push("1=0".to_string());
950 } else {
951 let mut parts = vec![];
952 for item in value.members() {
953 let escaped = super::sql_safety::escape_string(&item.to_string());
954 parts.push(format!(
955 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
956 escaped
957 ));
958 }
959 self.params
960 .where_and
961 .push(format!("({})", parts.join(" OR ")));
962 }
963 } else {
964 let escaped = super::sql_safety::escape_string(&value.to_string());
965 self.params.where_and.push(format!(
966 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
967 escaped
968 ));
969 }
970 }
971 _ => {
972 self.params
973 .where_and
974 .push(format!("{join_table}.{field} {compare} '{value}'"));
975 }
976 }
977 self
978 }
979
980 fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
981 for f in field.split('|') {
982 if !super::sql_safety::validate_field_name(f) {
983 error!("Invalid field name: {}", f);
984 }
985 }
986 if !super::sql_safety::validate_compare_orator(compare) {
987 error!("Invalid compare operator: {}", compare);
988 }
989 let join_table = if self.params.join_table.is_empty() {
990 self.params.table.clone()
991 } else {
992 self.params.join_table.clone()
993 };
994
995 if value.is_boolean() {
996 let bool_val = value.as_bool().unwrap_or(false);
997 self.params
998 .where_or
999 .push(format!("{join_table}.{field} {compare} {bool_val}"));
1000 return self;
1001 }
1002
1003 match compare {
1004 "between" => {
1005 self.params.where_or.push(format!(
1006 "{}.{} between '{}' AND '{}'",
1007 join_table, field, value[0], value[1]
1008 ));
1009 }
1010 "set" => {
1011 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1012 let mut wheredata = vec![];
1013 for item in list.iter() {
1014 wheredata.push(format!(
1015 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1016 ));
1017 }
1018 self.params
1019 .where_or
1020 .push(format!("({})", wheredata.join(" or ")));
1021 }
1022 "notin" => {
1023 let mut text = String::new();
1024 for item in value.members() {
1025 text = format!("{text},'{item}'");
1026 }
1027 text = text.trim_start_matches(",").into();
1028 self.params
1029 .where_or
1030 .push(format!("{join_table}.{field} not in ({text})"));
1031 }
1032 "is" => {
1033 self.params
1034 .where_or
1035 .push(format!("{join_table}.{field} is {value}"));
1036 }
1037 "isnot" => {
1038 self.params
1039 .where_or
1040 .push(format!("{join_table}.{field} is not {value}"));
1041 }
1042 "in" => {
1043 if value.is_array() && value.is_empty() {
1044 self.params.where_or.push("1=0".to_string());
1045 return self;
1046 }
1047 let mut text = String::new();
1048 if value.is_array() {
1049 for item in value.members() {
1050 text = format!("{text},'{item}'");
1051 }
1052 } else {
1053 let value = value.as_str().unwrap_or("");
1054 let value: Vec<&str> = value.split(",").collect();
1055 for item in value.iter() {
1056 text = format!("{text},'{item}'");
1057 }
1058 }
1059 text = text.trim_start_matches(",").into();
1060 self.params
1061 .where_or
1062 .push(format!("{join_table}.{field} {compare} ({text})"));
1063 }
1064 "json_contains" => {
1068 if value.is_array() {
1069 if value.is_empty() {
1070 self.params.where_or.push("1=0".to_string());
1071 } else {
1072 let mut parts = vec![];
1073 for item in value.members() {
1074 let escaped = super::sql_safety::escape_string(&item.to_string());
1075 parts.push(format!(
1076 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1077 escaped
1078 ));
1079 }
1080 self.params
1081 .where_or
1082 .push(format!("({})", parts.join(" OR ")));
1083 }
1084 } else {
1085 let escaped = super::sql_safety::escape_string(&value.to_string());
1086 self.params.where_or.push(format!(
1087 "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1088 escaped
1089 ));
1090 }
1091 }
1092 _ => {
1093 self.params
1094 .where_or
1095 .push(format!("{join_table}.{field} {compare} '{value}'"));
1096 }
1097 }
1098 self
1099 }
1100
1101 fn where_raw(&mut self, expr: &str) -> &mut Self {
1102 self.params.where_and.push(expr.to_string());
1103 self
1104 }
1105
1106 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1107 self.params
1108 .where_and
1109 .push(format!("\"{field}\" IN ({sub_sql})"));
1110 self
1111 }
1112
1113 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1114 self.params
1115 .where_and
1116 .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1117 self
1118 }
1119
1120 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1121 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1122 self
1123 }
1124
1125 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1126 self.params
1127 .where_and
1128 .push(format!("NOT EXISTS ({sub_sql})"));
1129 self
1130 }
1131
1132 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1133 self.params.where_column = format!(
1134 "{}.{} {} {}.{}",
1135 self.params.table, field_a, compare, self.params.table, field_b
1136 );
1137 self
1138 }
1139
1140 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1141 self.params
1142 .update_column
1143 .push(format!("{field_a} = {compare}"));
1144 self
1145 }
1146
1147 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1148 self.params.page = page;
1149 self.params.limit = limit;
1150 self
1151 }
1152
1153 fn limit(&mut self, count: i32) -> &mut Self {
1154 self.params.limit_only = count;
1155 self
1156 }
1157
1158 fn column(&mut self, field: &str) -> JsonValue {
1159 self.field(field);
1160 let sql = self.params.select_sql();
1161
1162 if self.params.sql {
1163 return JsonValue::from(sql);
1164 }
1165 let (state, data) = self.query(sql.as_str());
1166 match state {
1167 true => {
1168 let mut list = array![];
1169 for item in data.members() {
1170 if self.params.json[field].is_empty() {
1171 let _ = list.push(item[field].clone());
1172 } else {
1173 let data =
1174 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1175 let _ = list.push(data);
1176 }
1177 }
1178 list
1179 }
1180 false => {
1181 array![]
1182 }
1183 }
1184 }
1185
1186 fn count(&mut self) -> JsonValue {
1187 self.params.fields = json::object! {};
1188 self.params.fields["count"] = "count(*) as count".to_string().into();
1189 let sql = self.params.select_sql();
1190 if self.params.sql {
1191 return JsonValue::from(sql.clone());
1192 }
1193 let (state, data) = self.query(sql.as_str());
1194 if state {
1195 data[0]["count"].clone()
1196 } else {
1197 JsonValue::from(0)
1198 }
1199 }
1200
1201 fn max(&mut self, field: &str) -> JsonValue {
1202 self.params.fields[field] = format!("max({field}) as {field}").into();
1203 let sql = self.params.select_sql();
1204 if self.params.sql {
1205 return JsonValue::from(sql.clone());
1206 }
1207 let (state, data) = self.query(sql.as_str());
1208 if state {
1209 if data.len() > 1 {
1210 return data.clone();
1211 }
1212 data[0][field].clone()
1213 } else {
1214 JsonValue::from(0)
1215 }
1216 }
1217
1218 fn min(&mut self, field: &str) -> JsonValue {
1219 self.params.fields[field] = format!("min({field}) as {field}").into();
1220 let sql = self.params.select_sql();
1221 if self.params.sql {
1222 return JsonValue::from(sql.clone());
1223 }
1224 let (state, data) = self.query(sql.as_str());
1225 if state {
1226 if data.len() > 1 {
1227 return data;
1228 }
1229 data[0][field].clone()
1230 } else {
1231 JsonValue::from(0)
1232 }
1233 }
1234
1235 fn sum(&mut self, field: &str) -> JsonValue {
1236 self.params.fields[field] = format!("sum({field}) as {field}").into();
1237 let sql = self.params.select_sql();
1238 if self.params.sql {
1239 return JsonValue::from(sql.clone());
1240 }
1241 let (state, data) = self.query(sql.as_str());
1242 match state {
1243 true => {
1244 if data.len() > 1 {
1245 return data;
1246 }
1247 data[0][field].clone()
1248 }
1249 false => JsonValue::from(0),
1250 }
1251 }
1252
1253 fn avg(&mut self, field: &str) -> JsonValue {
1254 self.params.fields[field] = format!("avg({field}) as {field}").into();
1255 let sql = self.params.select_sql();
1256 if self.params.sql {
1257 return JsonValue::from(sql.clone());
1258 }
1259 let (state, data) = self.query(sql.as_str());
1260 if state {
1261 if data.len() > 1 {
1262 return data;
1263 }
1264 data[0][field].clone()
1265 } else {
1266 JsonValue::from(0)
1267 }
1268 }
1269
1270 fn having(&mut self, expr: &str) -> &mut Self {
1271 self.params.having.push(expr.to_string());
1272 self
1273 }
1274
1275 fn select(&mut self) -> JsonValue {
1276 let sql = self.params.select_sql();
1277 if self.params.sql {
1278 return JsonValue::from(sql.clone());
1279 }
1280 let (state, mut data) = self.query(sql.as_str());
1281 match state {
1282 true => {
1283 for (field, _) in self.params.json.entries() {
1284 for item in data.members_mut() {
1285 if !item[field].is_empty() {
1286 let json = item[field].to_string();
1287 item[field] = match json::parse(&json) {
1288 Ok(e) => e,
1289 Err(_) => JsonValue::from(json),
1290 };
1291 }
1292 }
1293 }
1294 data.clone()
1295 }
1296 false => array![],
1297 }
1298 }
1299
1300 fn find(&mut self) -> JsonValue {
1301 self.params.page = 1;
1302 self.params.limit = 1;
1303 let sql = self.params.select_sql();
1304 if self.params.sql {
1305 return JsonValue::from(sql.clone());
1306 }
1307 let (state, mut data) = self.query(sql.as_str());
1308 match state {
1309 true => {
1310 if data.is_empty() {
1311 return object! {};
1312 }
1313 for (field, _) in self.params.json.entries() {
1314 if !data[0][field].is_empty() {
1315 let json = data[0][field].to_string();
1316 let json = json::parse(&json).unwrap_or(array![]);
1317 data[0][field] = json;
1318 } else {
1319 data[0][field] = array![];
1320 }
1321 }
1322 data[0].clone()
1323 }
1324 false => {
1325 object! {}
1326 }
1327 }
1328 }
1329
1330 fn value(&mut self, field: &str) -> JsonValue {
1331 self.params.fields = object! {};
1332 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1333 self.params.page = 1;
1334 self.params.limit = 1;
1335 let sql = self.params.select_sql();
1336 if self.params.sql {
1337 return JsonValue::from(sql.clone());
1338 }
1339 let (state, mut data) = self.query(sql.as_str());
1340 match state {
1341 true => {
1342 for (field, _) in self.params.json.entries() {
1343 if !data[0][field].is_empty() {
1344 let json = data[0][field].to_string();
1345 let json = json::parse(&json).unwrap_or(array![]);
1346 data[0][field] = json;
1347 } else {
1348 data[0][field] = array![];
1349 }
1350 }
1351 data[0][field].clone()
1352 }
1353 false => {
1354 if self.connection.debug {
1355 info!("{data:?}");
1356 }
1357 JsonValue::Null
1358 }
1359 }
1360 }
1361
1362 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1363 let fields_list = self.table_info(&self.params.table.clone());
1364 let mut fields = vec![];
1365 let mut values = vec![];
1366 if !self.params.autoinc && data["id"].is_empty() {
1367 let thread_id = format!("{:?}", std::thread::current().id());
1368 let thread_num: u64 = thread_id
1369 .trim_start_matches("ThreadId(")
1370 .trim_end_matches(")")
1371 .parse()
1372 .unwrap_or(0);
1373 data["id"] = format!(
1374 "{:X}{:X}",
1375 Local::now().timestamp_nanos_opt().unwrap_or(0),
1376 thread_num
1377 )
1378 .into();
1379 }
1380 for (field, value) in data.entries() {
1381 fields.push(format!("\"{}\"", field));
1382
1383 if value.is_string() {
1384 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1385 continue;
1386 } else if value.is_array() {
1387 if self.params.json[field].is_empty() {
1388 let array = value
1389 .members()
1390 .map(|x| x.as_str().unwrap_or(""))
1391 .collect::<Vec<&str>>()
1392 .join(",");
1393 values.push(format!("'{array}'"));
1394 } else {
1395 let json = value.to_string();
1396 let json = json.replace("'", "''");
1397 values.push(format!("'{json}'"));
1398 }
1399 continue;
1400 } else if value.is_object() {
1401 if self.params.json[field].is_empty() {
1402 values.push(format!("'{value}'"));
1403 } else {
1404 let json = value.to_string();
1405 let json = json.replace("'", "''");
1406 values.push(format!("'{json}'"));
1407 }
1408 continue;
1409 } else if value.is_number() {
1410 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1411 if col_type == "boolean" {
1412 let bool_val = value.as_i64().unwrap_or(0) != 0;
1413 values.push(format!("{bool_val}"));
1414 } else if col_type.contains("int") {
1415 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1416 } else {
1417 values.push(format!("{value}"));
1418 }
1419 continue;
1420 } else if value.is_boolean() || value.is_null() {
1421 values.push(format!("{value}"));
1422 continue;
1423 } else {
1424 values.push(format!("'{value}'"));
1425 continue;
1426 }
1427 }
1428 let fields = fields.join(",");
1429 let values = values.join(",");
1430
1431 let sql = format!(
1432 "INSERT INTO {} ({}) VALUES ({});",
1433 self.params.table, fields, values
1434 );
1435 if self.params.sql {
1436 return JsonValue::from(sql.clone());
1437 }
1438 let (state, ids) = self.execute(sql.as_str());
1439
1440 match state {
1441 true => match self.params.autoinc {
1442 true => ids.clone(),
1443 false => data["id"].clone(),
1444 },
1445 false => {
1446 let thread_id = format!("{:?}", thread::current().id());
1447 error!("添加失败: {thread_id} {ids:?} {sql}");
1448 JsonValue::from("")
1449 }
1450 }
1451 }
1452 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1453 let fields_list = self.table_info(&self.params.table.clone());
1454 let mut fields = String::new();
1455 if !self.params.autoinc && data[0]["id"].is_empty() {
1456 data[0]["id"] = "".into();
1457 }
1458 for (field, _) in data[0].entries() {
1459 fields = format!("{fields},\"{field}\"");
1460 }
1461 fields = fields.trim_start_matches(",").to_string();
1462
1463 let core_count = num_cpus::get();
1464 let mut p = pools::Pool::new(core_count * 4);
1465
1466 let autoinc = self.params.autoinc;
1467 for list in data.members() {
1468 let mut item = list.clone();
1469 let i = br_fields::str::Code::verification_code(3);
1470 let fields_list_new = fields_list.clone();
1471 p.execute(move |pcindex| {
1472 if !autoinc && item["id"].is_empty() {
1473 let id = format!(
1474 "{:X}{:X}{}",
1475 Local::now().timestamp_nanos_opt().unwrap_or(0),
1476 pcindex,
1477 i
1478 );
1479 item["id"] = id.into();
1480 }
1481 let mut values = "".to_string();
1482 for (field, value) in item.entries() {
1483 if value.is_string() {
1484 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1485 } else if value.is_number() {
1486 let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1487 if col_type == "boolean" {
1488 let bool_val = value.as_i64().unwrap_or(0) != 0;
1489 values = format!("{values},{bool_val}");
1490 } else if col_type.contains("int") {
1491 values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1492 } else {
1493 values = format!("{values},{value}");
1494 }
1495 } else if value.is_boolean() {
1496 values = format!("{values},{value}");
1497 continue;
1498 } else {
1499 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1500 }
1501 }
1502 values = format!("({})", values.trim_start_matches(","));
1503 array![item["id"].clone(), values]
1504 });
1505 }
1506 let (ids_list, mut values) = p.insert_all();
1507 values = values.trim_start_matches(",").to_string();
1508 let sql = format!(
1509 "INSERT INTO {} ({}) VALUES {};",
1510 self.params.table, fields, values
1511 );
1512
1513 if self.params.sql {
1514 return JsonValue::from(sql.clone());
1515 }
1516 let (state, data) = self.execute(sql.as_str());
1517 match state {
1518 true => match autoinc {
1519 true => data,
1520 false => JsonValue::from(ids_list),
1521 },
1522 false => {
1523 error!("insert_all: {data:?}");
1524 array![]
1525 }
1526 }
1527 }
1528 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1529 let fields_list = self.table_info(&self.params.table.clone());
1530 let mut fields = vec![];
1531 let mut values = vec![];
1532 if !self.params.autoinc && data["id"].is_empty() {
1533 let thread_id = format!("{:?}", std::thread::current().id());
1534 let thread_num: u64 = thread_id
1535 .trim_start_matches("ThreadId(")
1536 .trim_end_matches(")")
1537 .parse()
1538 .unwrap_or(0);
1539 data["id"] = format!(
1540 "{:X}{:X}",
1541 Local::now().timestamp_nanos_opt().unwrap_or(0),
1542 thread_num
1543 )
1544 .into();
1545 }
1546 for (field, value) in data.entries() {
1547 fields.push(format!("\"{}\"", field));
1548
1549 if value.is_string() {
1550 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1551 continue;
1552 } else if value.is_array() {
1553 if self.params.json[field].is_empty() {
1554 let array = value
1555 .members()
1556 .map(|x| x.as_str().unwrap_or(""))
1557 .collect::<Vec<&str>>()
1558 .join(",");
1559 values.push(format!("'{array}'"));
1560 } else {
1561 let json = value.to_string();
1562 let json = json.replace("'", "''");
1563 values.push(format!("'{json}'"));
1564 }
1565 continue;
1566 } else if value.is_object() {
1567 if self.params.json[field].is_empty() {
1568 values.push(format!("'{value}'"));
1569 } else {
1570 let json = value.to_string();
1571 let json = json.replace("'", "''");
1572 values.push(format!("'{json}'"));
1573 }
1574 continue;
1575 } else if value.is_number() {
1576 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1577 if col_type == "boolean" {
1578 let bool_val = value.as_i64().unwrap_or(0) != 0;
1579 values.push(format!("{bool_val}"));
1580 } else if col_type.contains("int") {
1581 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1582 } else {
1583 values.push(format!("{value}"));
1584 }
1585 continue;
1586 } else if value.is_boolean() || value.is_null() {
1587 values.push(format!("{value}"));
1588 continue;
1589 } else {
1590 values.push(format!("'{value}'"));
1591 continue;
1592 }
1593 }
1594
1595 let conflict_cols: Vec<String> = conflict_fields
1596 .iter()
1597 .map(|f| format!("\"{}\"", f))
1598 .collect();
1599
1600 let update_set: Vec<String> = fields
1601 .iter()
1602 .filter(|f| {
1603 let name = f.trim_matches('"');
1604 !conflict_fields.contains(&name) && name != "id"
1605 })
1606 .map(|f| format!("{f}=EXCLUDED.{f}"))
1607 .collect();
1608
1609 let fields_str = fields.join(",");
1610 let values_str = values.join(",");
1611
1612 let sql = format!(
1613 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1614 self.params.table,
1615 fields_str,
1616 values_str,
1617 conflict_cols.join(","),
1618 update_set.join(",")
1619 );
1620 if self.params.sql {
1621 return JsonValue::from(sql.clone());
1622 }
1623 let (state, result) = self.execute(sql.as_str());
1624 match state {
1625 true => match self.params.autoinc {
1626 true => result.clone(),
1627 false => data["id"].clone(),
1628 },
1629 false => {
1630 let thread_id = format!("{:?}", thread::current().id());
1631 error!("upsert失败: {thread_id} {result:?} {sql}");
1632 JsonValue::from("")
1633 }
1634 }
1635 }
1636 fn update(&mut self, data: JsonValue) -> JsonValue {
1637 let fields_list = self.table_info(&self.params.table.clone());
1638 let mut values = vec![];
1639 for (field, value) in data.entries() {
1640 if value.is_string() {
1641 values.push(format!(
1642 "\"{}\"='{}'",
1643 field,
1644 value.to_string().replace("'", "''")
1645 ));
1646 } else if value.is_number() {
1647 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1648 if col_type == "boolean" {
1649 let bool_val = value.as_i64().unwrap_or(0) != 0;
1650 values.push(format!("\"{field}\"= {bool_val}"));
1651 } else if col_type.contains("int") {
1652 values.push(format!(
1653 "\"{}\"= {}",
1654 field,
1655 value.as_f64().unwrap_or(0.0) as i64
1656 ));
1657 } else {
1658 values.push(format!("\"{field}\"= {value}"));
1659 }
1660 } else if value.is_array() {
1661 if self.params.json[field].is_empty() {
1662 let array = value
1663 .members()
1664 .map(|x| x.as_str().unwrap_or(""))
1665 .collect::<Vec<&str>>()
1666 .join(",");
1667 values.push(format!("\"{field}\"='{array}'"));
1668 } else {
1669 let json = value.to_string();
1670 let json = json.replace("'", "''");
1671 values.push(format!("\"{field}\"='{json}'"));
1672 }
1673 continue;
1674 } else if value.is_object() {
1675 if self.params.json[field].is_empty() {
1676 values.push(format!("\"{field}\"='{value}'"));
1677 } else {
1678 if value.is_empty() {
1679 values.push(format!("\"{field}\"=''"));
1680 continue;
1681 }
1682 let json = value.to_string();
1683 let json = json.replace("'", "''");
1684 values.push(format!("\"{field}\"='{json}'"));
1685 }
1686 continue;
1687 } else if value.is_boolean() {
1688 values.push(format!("\"{field}\"= {value}"));
1689 } else {
1690 values.push(format!("\"{field}\"=\"{value}\""));
1691 }
1692 }
1693
1694 for (field, value) in self.params.inc_dec.entries() {
1695 values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1696 }
1697 if !self.params.update_column.is_empty() {
1698 values.extend(self.params.update_column.clone());
1699 }
1700 let values = values.join(",");
1701
1702 let sql = format!(
1703 "UPDATE {} SET {} {};",
1704 self.params.table.clone(),
1705 values,
1706 self.params.where_sql()
1707 );
1708 if self.params.sql {
1709 return JsonValue::from(sql.clone());
1710 }
1711 let (state, data) = self.execute(sql.as_str());
1712 if state {
1713 data
1714 } else {
1715 let thread_id = format!("{:?}", thread::current().id());
1716 error!("update: {thread_id} {data:?} {sql}");
1717 0.into()
1718 }
1719 }
1720 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1721 let fields_list = self.table_info(&self.params.table.clone());
1722 let mut values = vec![];
1723
1724 let mut ids = vec![];
1725 for (field, _) in data[0].entries() {
1726 if field == "id" {
1727 continue;
1728 }
1729 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1730 let mut fields = vec![];
1731 for row in data.members() {
1732 let value = row[field].clone();
1733 let id = row["id"].clone();
1734 ids.push(id.clone());
1735 if value.is_string() {
1736 fields.push(format!(
1737 "WHEN '{}' THEN '{}'",
1738 id,
1739 value.to_string().replace("'", "''")
1740 ));
1741 } else if value.is_array() || value.is_object() {
1742 if self.params.json[field].is_empty() {
1743 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1744 } else {
1745 let json = value.to_string();
1746 let json = json.replace("'", "''");
1747 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1748 }
1749 continue;
1750 } else if value.is_number() {
1751 if col_type == "boolean" {
1752 let bool_val = value.as_i64().unwrap_or(0) != 0;
1753 fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1754 } else {
1755 fields.push(format!("WHEN '{id}' THEN {value}"));
1756 }
1757 } else if value.is_boolean() || value.is_null() {
1758 fields.push(format!("WHEN '{id}' THEN {value}"));
1759 } else {
1760 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1761 }
1762 }
1763 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1764 }
1765 self.where_and("id", "in", ids.into());
1766 for (field, value) in self.params.inc_dec.entries() {
1767 values.push(format!("{} = {}", field, value.to_string().clone()));
1768 }
1769
1770 let values = values.join(",");
1771 let sql = format!(
1772 "UPDATE {} SET {} {} {};",
1773 self.params.table.clone(),
1774 values,
1775 self.params.where_sql(),
1776 self.params.page_limit_sql()
1777 );
1778 if self.params.sql {
1779 return JsonValue::from(sql.clone());
1780 }
1781 let (state, data) = self.execute(sql.as_str());
1782 if state {
1783 data
1784 } else {
1785 error!("update_all: {data:?}");
1786 JsonValue::from(0)
1787 }
1788 }
1789
1790 fn delete(&mut self) -> JsonValue {
1791 let sql = format!(
1792 "delete FROM {} {} {};",
1793 self.params.table.clone(),
1794 self.params.where_sql(),
1795 self.params.page_limit_sql()
1796 );
1797 if self.params.sql {
1798 return JsonValue::from(sql.clone());
1799 }
1800 let (state, data) = self.execute(sql.as_str());
1801 match state {
1802 true => data,
1803 false => {
1804 error!("delete 失败>>> {data:?}");
1805 JsonValue::from(0)
1806 }
1807 }
1808 }
1809
1810 fn transaction(&mut self) -> bool {
1811 let thread_id = format!("{:?}", thread::current().id());
1812 let key = format!("{}{}", self.default, thread_id);
1813
1814 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1815 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1816 PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1817 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1818 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1819 return true;
1820 }
1821
1822 let mut conn = None;
1824 for attempt in 0..3u8 {
1825 match self.client.get_connect_for_transaction() {
1826 Ok(mut c) => {
1827 if c.is_valid() {
1828 conn = Some(c);
1829 break;
1830 }
1831 warn!("事务连接无效(第{}次)", attempt + 1);
1832 self.client.release_transaction_conn();
1833 }
1834 Err(e) => {
1835 warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1836 }
1837 }
1838 if attempt < 2 {
1839 thread::sleep(std::time::Duration::from_millis(200));
1840 }
1841 }
1842 let mut conn = match conn {
1843 Some(c) => c,
1844 None => {
1845 error!("获取事务连接重试耗尽");
1846 return false;
1847 }
1848 };
1849
1850 if let Err(e) = conn.execute("START TRANSACTION") {
1851 error!("启动事务失败: {e}");
1852 self.client.release_transaction_conn();
1853 return false;
1854 }
1855
1856 if let Err(e) = conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") {
1857 error!("设置事务隔离级别失败: {e}");
1858 let _ = conn.execute("ROLLBACK");
1859 self.client.release_transaction_conn();
1860 return false;
1861 }
1862
1863 PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1864 true
1865 }
1866 fn commit(&mut self) -> bool {
1867 let thread_id = format!("{:?}", thread::current().id());
1868 let key = format!("{}{}", self.default, thread_id);
1869
1870 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1871 error!("commit: 没有活跃的事务");
1872 return false;
1873 }
1874
1875 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1876 if depth > 1 {
1877 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1878 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1879 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1880 return true;
1881 }
1882
1883 let commit_result =
1884 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1885
1886 let success = match commit_result {
1887 Some(Ok(_)) => true,
1888 Some(Err(e)) => {
1889 error!("提交事务失败: {e}");
1890 false
1891 }
1892 None => {
1893 error!("提交事务失败: 未找到连接");
1894 false
1895 }
1896 };
1897
1898 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1899 self.client.release_transaction_conn();
1900 success
1901 }
1902
1903 fn rollback(&mut self) -> bool {
1904 let thread_id = format!("{:?}", thread::current().id());
1905 let key = format!("{}{}", self.default, thread_id);
1906
1907 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1908 error!("rollback: 没有活跃的事务");
1909 return false;
1910 }
1911
1912 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1913 if depth > 1 {
1914 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1915 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1916 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1917 return true;
1918 }
1919
1920 let rollback_result =
1921 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1922
1923 let success = match rollback_result {
1924 Some(Ok(_)) => true,
1925 Some(Err(e)) => {
1926 error!("回滚失败: {e}");
1927 false
1928 }
1929 None => {
1930 error!("回滚失败: 未找到连接");
1931 false
1932 }
1933 };
1934
1935 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1936 self.client.release_transaction_conn();
1937 success
1938 }
1939
1940 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1941 let (state, data) = self.query(sql);
1942 match state {
1943 true => Ok(data),
1944 false => Err(data.to_string()),
1945 }
1946 }
1947
1948 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1949 let (state, data) = self.execute(sql);
1950 match state {
1951 true => Ok(data),
1952 false => Err(data.to_string()),
1953 }
1954 }
1955
1956 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1957 self.params.inc_dec[field] = format!("{field} + {num}").into();
1958 self
1959 }
1960
1961 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1962 self.params.inc_dec[field] = format!("{field} - {num}").into();
1963 self
1964 }
1965 fn buildsql(&mut self) -> String {
1966 self.fetch_sql();
1967 let sql = self.select().to_string();
1968 format!("( {} ) {}", sql, self.params.table)
1969 }
1970
1971 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1972 for field in fields {
1973 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1974 }
1975 self
1976 }
1977
1978 fn join(
1979 &mut self,
1980 main_table: &str,
1981 main_fields: &str,
1982 right_table: &str,
1983 right_fields: &str,
1984 ) -> &mut Self {
1985 let main_table = if main_table.is_empty() {
1986 self.params.table.clone()
1987 } else {
1988 main_table.to_string()
1989 };
1990 self.params.join_table = right_table.to_string();
1991 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1992 self
1993 }
1994
1995 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1996 let main_fields = if main_fields.is_empty() {
1997 "id"
1998 } else {
1999 main_fields
2000 };
2001 let second_fields = if second_fields.is_empty() {
2002 self.params.table.clone()
2003 } else {
2004 second_fields.to_string().clone()
2005 };
2006 let sec_table_name = format!("{}{}", table, "_2");
2007 let second_table = format!("{} {}", table, sec_table_name.clone());
2008 self.params.join_table = sec_table_name.clone();
2009 self.params.join.push(format!(
2010 " INNER JOIN {} ON {}.{} = {}.{}",
2011 second_table, self.params.table, main_fields, sec_table_name, second_fields
2012 ));
2013 self
2014 }
2015
2016 fn join_right(
2017 &mut self,
2018 main_table: &str,
2019 main_fields: &str,
2020 right_table: &str,
2021 right_fields: &str,
2022 ) -> &mut Self {
2023 let main_table = if main_table.is_empty() {
2024 self.params.table.clone()
2025 } else {
2026 main_table.to_string()
2027 };
2028 self.params.join_table = right_table.to_string();
2029 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2030 self
2031 }
2032
2033 fn join_full(
2034 &mut self,
2035 main_table: &str,
2036 main_fields: &str,
2037 right_table: &str,
2038 right_fields: &str,
2039 ) -> &mut Self {
2040 let main_table = if main_table.is_empty() {
2041 self.params.table.clone()
2042 } else {
2043 main_table.to_string()
2044 };
2045 self.params.join_table = right_table.to_string();
2046 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2047 self
2048 }
2049
2050 fn union(&mut self, sub_sql: &str) -> &mut Self {
2051 self.params.unions.push(format!("UNION {sub_sql}"));
2052 self
2053 }
2054
2055 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2056 self.params.unions.push(format!("UNION ALL {sub_sql}"));
2057 self
2058 }
2059
2060 fn lock_for_update(&mut self) -> &mut Self {
2061 self.params.lock_mode = "FOR UPDATE".to_string();
2062 self
2063 }
2064
2065 fn lock_for_share(&mut self) -> &mut Self {
2066 self.params.lock_mode = "FOR SHARE".to_string();
2067 self
2068 }
2069}