1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use br_pgsql::PgsqlError;
7use chrono::Local;
8use json::{array, object, JsonValue};
9use log::{error, info, warn};
10use std::thread;
11#[derive(Clone)]
12pub struct Pgsql {
13 pub connection: Connection,
15 pub default: String,
17 pub params: Params,
18 pub client: Pools,
19}
20
21impl Pgsql {
22 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
23 let port = connection
24 .hostport
25 .parse::<i32>()
26 .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
27
28 let cp_connection = connection.clone();
29 let config = object! {
30 debug: cp_connection.debug,
31 username: cp_connection.username,
32 userpass: cp_connection.userpass,
33 database: cp_connection.database,
34 hostname: cp_connection.hostname,
35 hostport: port,
36 charset: cp_connection.charset.str(),
37 };
38 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
39
40 let pools = pgsql.pools()?;
41 Ok(Self {
42 connection,
43 default: default.clone(),
44 params: Params::default("pgsql"),
45 client: pools,
46 })
47 }
48
49 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
50 let thread_id = format!("{:?}", thread::current().id());
51 let key = format!("{}{}", self.default, thread_id);
52
53 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
54 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
55
56 match result {
57 Some(Ok(e)) => {
58 if self.connection.debug {
59 info!("查询成功: {} {}", thread_id.clone(), sql);
60 }
61 (true, e.rows)
62 }
63 Some(Err(e)) => {
64 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
65 (false, JsonValue::from(e.to_string()))
66 }
67 None => {
68 error!("事务查询失败: 未找到事务连接 {thread_id}");
69 (false, JsonValue::from("未找到事务连接"))
70 }
71 }
72 } else {
73 let mut guard = match self.client.get_guard() {
74 Ok(g) => g,
75 Err(e) => {
76 error!(
77 "非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
78 );
79 return (false, JsonValue::from(e.to_string()));
80 }
81 };
82 let res = guard.conn().query(sql);
83 match res {
84 Ok(e) => {
85 if self.connection.debug {
86 info!("查询成功: {} {}", thread_id.clone(), sql);
87 }
88 (true, e.rows)
89 }
90 Err(ref e) if Self::is_retriable_error(e) => {
91 warn!("非事务查询连接断开,重试: {thread_id} {e}");
92 drop(guard);
93 let mut guard2 = match self.client.get_guard() {
94 Ok(g) => g,
95 Err(e2) => {
96 error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e2} SQL语句: [{sql}]");
97 return (false, JsonValue::from(e2.to_string()));
98 }
99 };
100 match guard2.conn().query(sql) {
101 Ok(e) => {
102 if self.connection.debug {
103 info!("查询重试成功: {} {}", thread_id.clone(), sql);
104 }
105 (true, e.rows)
106 }
107 Err(e2) => {
108 error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e2} SQL语句: [{sql}]");
109 (false, JsonValue::from(e2.to_string()))
110 }
111 }
112 }
113 Err(e) => {
114 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
115 (false, JsonValue::from(e.to_string()))
116 }
117 }
118 }
119 }
120 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
121 let thread_id = format!("{:?}", thread::current().id());
122 let key = format!("{}{}", self.default, thread_id);
123
124 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
125 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
126
127 match result {
128 Some(Ok(e)) => {
129 if self.connection.debug {
130 info!("提交成功: {} {}", thread_id.clone(), sql);
131 }
132 if sql.contains("INSERT") {
133 (true, e.rows)
134 } else {
135 (true, e.affect_count.into())
136 }
137 }
138 Some(Err(e)) => {
139 error!("事务提交失败: {thread_id} {e} {sql}");
140 (false, JsonValue::from(e.to_string()))
141 }
142 None => {
143 error!("事务执行失败: 未找到事务连接 {thread_id}");
144 (false, JsonValue::from("未找到事务连接"))
145 }
146 }
147 } else {
148 let mut guard = match self.client.get_guard() {
149 Ok(g) => g,
150 Err(e) => {
151 error!(
152 "非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
153 );
154 return (false, JsonValue::from(e.to_string()));
155 }
156 };
157 let res = guard.conn().execute(sql);
158 match res {
159 Ok(e) => {
160 if self.connection.debug {
161 info!("提交成功: {} {}", thread_id.clone(), sql);
162 }
163 if sql.contains("INSERT") {
164 (true, e.rows)
165 } else {
166 (true, e.affect_count.into())
167 }
168 }
169 Err(ref e) if Self::is_retriable_error(e) => {
170 warn!("非事务执行连接断开,重试: {thread_id} {e}");
171 drop(guard);
172 let mut guard2 = match self.client.get_guard() {
173 Ok(g) => g,
174 Err(e2) => {
175 error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e2} SQL语句: [{sql}]");
176 return (false, JsonValue::from(e2.to_string()));
177 }
178 };
179 match guard2.conn().execute(sql) {
180 Ok(e) => {
181 if self.connection.debug {
182 info!("执行重试成功: {} {}", thread_id.clone(), sql);
183 }
184 if sql.contains("INSERT") {
185 (true, e.rows)
186 } else {
187 (true, e.affect_count.into())
188 }
189 }
190 Err(e2) => {
191 error!("非事务执行重试失败: {thread_id} {e2} {sql}");
192 (false, JsonValue::from(e2.to_string()))
193 }
194 }
195 }
196 Err(e) => {
197 error!("非事务提交失败: {thread_id} {e} {sql}");
198 (false, JsonValue::from(e.to_string()))
199 }
200 }
201 }
202 }
203
204 fn is_retriable_error(e: &PgsqlError) -> bool {
206 matches!(e, PgsqlError::Connection(_) | PgsqlError::Io(_))
207 }
208}
209
210impl DbMode for Pgsql {
211 fn database_tables(&mut self) -> JsonValue {
212 let sql = "SHOW TABLES".to_string();
213 match self.sql(sql.as_str()) {
214 Ok(e) => {
215 let mut list = vec![];
216 for item in e.members() {
217 for (_, value) in item.entries() {
218 list.push(value.clone());
219 }
220 }
221 list.into()
222 }
223 Err(_) => {
224 array![]
225 }
226 }
227 }
228
229 fn database_create(&mut self, name: &str) -> bool {
230 let sql = format!("CREATE DATABASE {name}");
231
232 let (state, data) = self.execute(sql.as_str());
233 match state {
234 true => data.as_bool().unwrap_or(true),
235 false => {
236 error!("创建数据库失败: {data:?}");
237 false
238 }
239 }
240 }
241
242 fn truncate(&mut self, table: &str) -> bool {
243 let sql = format!("TRUNCATE TABLE {table}");
244 let (state, _) = self.execute(sql.as_str());
245 state
246 }
247}
248
249impl Mode for Pgsql {
250 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
251 let mut sql = String::new();
252 let mut comments = vec![];
253
254 if !options.table_unique.is_empty() {
255 let full_name = format!(
256 "{}_unique_{}",
257 options.table_name,
258 options.table_unique.join("_")
259 );
260 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
261 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
262 let unique = format!(
263 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
264 name,
265 options.table_name,
266 options.table_unique.join(",")
267 );
268 comments.push(unique);
269 }
270
271 for row in options.table_index.iter() {
272 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
273 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
274 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
275 let index = format!(
276 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
277 name,
278 options.table_name,
279 row.join(",")
280 );
281 comments.push(index);
282 }
283
284 for (name, field) in options.table_fields.entries_mut() {
285 field["table_name"] = options.table_name.clone().into();
286 let row = br_fields::field("pgsql", name, field.clone());
287 let (col_sql, meta) = if let Some(idx) = row.find("--") {
288 (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
289 } else {
290 (row.trim(), None)
291 };
292 if let Some(meta) = meta {
293 comments.push(format!(
294 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
295 options.table_name, name, meta
296 ));
297 }
298 sql = format!("{} {},\r\n", sql, col_sql);
299 }
300
301 let primary_key = format!(
302 "CONSTRAINT {}_{} PRIMARY KEY ({})",
303 options.table_name, options.table_key, options.table_key
304 );
305 let sql = format!(
306 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
307 options.table_name,
308 sql.trim_end_matches(",\r\n"),
309 primary_key
310 );
311 comments.insert(0, sql);
312
313 for (_name, field) in options.table_fields.entries() {
314 let _ = field["mode"].as_str();
315 }
316
317 if self.params.sql {
318 let info = comments.join("\r\n");
319 return JsonValue::from(info);
320 }
321 for comment in comments {
322 let (state, _) = self.execute(comment.as_str());
323 match state {
324 true => {}
325 false => {
326 return JsonValue::from(state);
327 }
328 }
329 }
330 JsonValue::from(true)
331 }
332
333 fn table_update(&mut self, options: TableOptions) -> JsonValue {
334 let fields_list = self.table_info(&options.table_name);
335 let mut put = vec![];
336 let mut add = vec![];
337 let mut del = vec![];
338 let mut comments = vec![];
339
340 for (key, _) in fields_list.entries() {
341 if options.table_fields[key].is_empty() {
342 del.push(key);
343 }
344 }
345 for (name, field) in options.table_fields.entries() {
346 if !fields_list[name].is_empty() {
347 let old_info = &fields_list[name];
348 let new_field_sql = br_fields::field("pgsql", name, field.clone());
349
350 let old_comment = old_info["comment"].as_str().unwrap_or("");
351 let old_type = old_info["type"].as_str().unwrap_or("");
352
353 let new_comment = if let Some(idx) = new_field_sql.find("--") {
354 new_field_sql[idx + 2..].trim()
355 } else {
356 ""
357 };
358
359 let comment_matches =
360 if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
361 let old_parts: Vec<&str> = old_comment.split('|').collect();
362 let new_parts: Vec<&str> = new_comment.split('|').collect();
363 if old_parts.len() >= 4 && new_parts.len() >= 4 {
364 old_parts[..4] == new_parts[..4]
365 } else {
366 old_comment == new_comment
367 }
368 } else if !old_comment.is_empty() && !new_comment.is_empty() {
369 let old_parts: Vec<&str> = old_comment.split('|').collect();
370 let new_parts: Vec<&str> = new_comment.split('|').collect();
371 if old_parts.len() >= 2
372 && new_parts.len() >= 2
373 && old_parts.len() == new_parts.len()
374 {
375 let old_filtered: Vec<&str> = old_parts
376 .iter()
377 .filter(|v| **v != "true" && **v != "false")
378 .copied()
379 .collect();
380 let new_filtered: Vec<&str> = new_parts
381 .iter()
382 .filter(|v| **v != "true" && **v != "false")
383 .copied()
384 .collect();
385 old_filtered == new_filtered
386 } else {
387 old_comment == new_comment
388 }
389 } else {
390 old_comment == new_comment
391 };
392
393 let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
394 let new_type = if sql_parts.len() > 1 {
395 sql_parts[1].to_lowercase()
396 } else {
397 String::new()
398 };
399
400 let type_matches = match old_type {
401 "integer" => {
402 new_type.contains("int")
403 && !new_type.contains("bigint")
404 && !new_type.contains("smallint")
405 }
406 "bigint" => new_type.contains("bigint"),
407 "smallint" => new_type.contains("smallint"),
408 "boolean" => new_type.contains("boolean"),
409 "text" => new_type.contains("text"),
410 "character varying" => new_type.contains("varchar"),
411 "character" => new_type.contains("char") && !new_type.contains("varchar"),
412 "numeric" => new_type.contains("numeric") || new_type.contains("decimal"),
413 "double precision" => {
414 new_type.contains("double") || new_type.contains("float8")
415 }
416 "real" => new_type.contains("real") || new_type.contains("float4"),
417 "timestamp without time zone" | "timestamp with time zone" => {
418 new_type.contains("timestamp")
419 }
420 "date" => new_type.contains("date") && !new_type.contains("timestamp"),
421 "time without time zone" | "time with time zone" => {
422 new_type.contains("time") && !new_type.contains("timestamp")
423 }
424 "json" | "jsonb" => new_type.contains("json"),
425 "uuid" => new_type.contains("uuid"),
426 "bytea" => new_type.contains("bytea"),
427 _ => old_type == new_type,
428 };
429
430 if type_matches && comment_matches {
431 continue;
432 }
433
434 log::debug!(
435 "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
436 options.table_name,
437 name,
438 type_matches,
439 old_type,
440 new_type,
441 comment_matches
442 );
443 put.push(name);
444 } else {
445 add.push(name);
446 }
447 }
448
449 for name in add.iter() {
450 let name = name.to_string();
451 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
452 let rows = row.split("--").collect::<Vec<&str>>();
453 comments.push(format!(
454 r#"ALTER TABLE "{}" add {};"#,
455 options.table_name,
456 rows[0].trim()
457 ));
458 if rows.len() > 1 {
459 comments.push(format!(
460 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
461 options.table_name,
462 name,
463 rows[1].trim()
464 ));
465 }
466 }
467 for name in del.iter() {
468 comments.push(format!(
469 "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
470 options.table_name, name
471 ));
472 }
473 for name in put.iter() {
474 let name = name.to_string();
475 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
476 let rows = row.split("--").collect::<Vec<&str>>();
477
478 let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
479
480 if sql[1].contains("BOOLEAN") {
481 let text = format!(
482 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
483 options.table_name, name
484 );
485 comments.push(text.clone());
486 let text = format!(
487 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
488 options.table_name, name, sql[1]
489 );
490 comments.push(text.clone());
491 } else {
492 let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
493 let new_type_lower = sql[1].to_lowercase();
494 let is_date_to_numeric = (old_col_type == "date"
495 || old_col_type.contains("timestamp"))
496 && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
497 if is_date_to_numeric {
498 comments.push(format!(
499 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
500 options.table_name, name
501 ));
502 comments.push(format!(
503 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
504 options.table_name, name, sql[1], name, name, name
505 ));
506 } else {
507 let text = format!(
508 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
509 options.table_name, name, sql[1]
510 );
511 comments.push(text.clone());
512 }
513 };
514
515 if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
516 let default_value = rows[0][default_pos + 9..].trim();
517 if !default_value.is_empty() {
518 comments.push(format!(
519 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
520 options.table_name, name, default_value
521 ));
522 }
523 }
524 let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
527 .as_str()
528 .unwrap_or("YES");
529 let old_is_required = old_is_nullable == "NO";
530
531 if old_is_required && name != options.table_key {
533 comments.push(format!(
534 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
535 options.table_name, name
536 ));
537 }
538
539 if rows.len() > 1 {
540 comments.push(format!(
541 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
542 options.table_name,
543 name,
544 rows[1].trim()
545 ));
546 }
547 }
548
549 let mut unique_new = vec![];
550 let mut index_new = vec![];
551 let mut primary_key = vec![];
552 let (_, index_list) = self.query(
553 format!(
554 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
555 options.table_name
556 )
557 .as_str(),
558 );
559 for item in index_list.members() {
560 let key_name = item["indexname"].as_str().unwrap_or("");
561 let indexdef = item["indexdef"].to_string();
562
563 if indexdef.contains(
564 format!(
565 "CREATE UNIQUE INDEX {}_{} ON",
566 options.table_name, options.table_key
567 )
568 .as_str(),
569 ) {
570 primary_key.push(key_name.to_string());
571 continue;
572 }
573 if indexdef.contains("CREATE UNIQUE INDEX") {
574 unique_new.push(key_name.to_string());
575 continue;
576 }
577 if indexdef.contains("CREATE INDEX") {
578 index_new.push(key_name.to_string());
579 continue;
580 }
581 }
582
583 if !options.table_unique.is_empty() {
584 let full_name = format!(
585 "{}_unique_{}",
586 options.table_name,
587 options.table_unique.join("_")
588 );
589 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
590 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
591 let unique = format!(
592 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
593 name,
594 options.table_name,
595 options.table_unique.join(",")
596 );
597 if !unique_new.contains(&name) {
598 comments.push(unique);
599 }
600 unique_new.retain(|x| *x != name);
601 }
602
603 for row in options.table_index.iter() {
604 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
605 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
606 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
607 let index = format!(
608 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
609 name,
610 options.table_name,
611 row.join(",")
612 );
613 if !index_new.contains(&name) {
614 comments.push(index);
615 }
616 index_new.retain(|x| *x != name);
617 }
618
619 for item in unique_new {
620 if item.ends_with("_pkey") {
621 continue;
622 }
623 if item.starts_with("unique_") {
624 comments.push(format!(
625 "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
626 options.table_name,
627 item.clone()
628 ));
629 } else {
630 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
631 }
632 }
633 for item in index_new {
634 if item.ends_with("_pkey") {
635 continue;
636 }
637 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
638 }
639
640 if self.params.sql {
641 return JsonValue::from(comments.join(""));
642 }
643
644 if comments.is_empty() {
645 return JsonValue::from(-1);
646 }
647
648 for item in comments.iter() {
649 let (state, res) = self.execute(item.as_str());
650 match state {
651 true => {}
652 false => {
653 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
654 return JsonValue::from(0);
655 }
656 }
657 }
658 JsonValue::from(1)
659 }
660
661 fn table_info(&mut self, table: &str) -> JsonValue {
662 let sql = format!(
663 "SELECT COL.COLUMN_NAME,
664 COL.DATA_TYPE,
665 COL.IS_NULLABLE,
666 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
667 LEFT JOIN
668 pg_catalog.pg_description DESCRIPTION
669 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
670 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
671 let (state, data) = self.query(sql.as_str());
672 let mut list = object! {};
673 if state {
674 for item in data.members() {
675 let mut row = object! {};
676 row["field"] = item["column_name"].clone();
677 row["comment"] = item["comment"].clone();
678 row["type"] = item["data_type"].clone();
679 row["is_nullable"] = item["is_nullable"].clone();
680 if let Some(field_name) = row["field"].as_str() {
681 list[field_name] = row.clone();
682 }
683 }
684 list
685 } else {
686 list
687 }
688 }
689
690 fn table_is_exist(&mut self, name: &str) -> bool {
691 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
692 let (state, data) = self.query(sql.as_str());
693 match state {
694 true => {
695 for item in data.members() {
696 if item.has_key("exists") {
697 return item["exists"].as_bool().unwrap_or(false);
698 }
699 }
700 false
701 }
702 false => false,
703 }
704 }
705
706 fn table(&mut self, name: &str) -> &mut Pgsql {
707 self.params = Params::default(self.connection.mode.str().as_str());
708 let table_name = format!("{}{}", self.connection.prefix, name);
709 if !super::sql_safety::validate_table_name(&table_name) {
710 error!("Invalid table name: {}", name);
711 }
712 self.params.table = table_name.clone();
713 self.params.join_table = table_name;
714 self
715 }
716
717 fn change_table(&mut self, name: &str) -> &mut Self {
718 self.params.join_table = name.to_string();
719 self
720 }
721
722 fn autoinc(&mut self) -> &mut Self {
723 self.params.autoinc = true;
724 self
725 }
726
727 fn timestamps(&mut self) -> &mut Self {
728 self.params.timestamps = true;
729 self
730 }
731
732 fn fetch_sql(&mut self) -> &mut Self {
733 self.params.sql = true;
734 self
735 }
736
737 fn order(&mut self, field: &str, by: bool) -> &mut Self {
738 self.params.order[field] = {
739 if by {
740 "DESC"
741 } else {
742 "ASC"
743 }
744 }
745 .into();
746 self
747 }
748
749 fn group(&mut self, field: &str) -> &mut Self {
750 let fields: Vec<&str> = field.split(",").collect();
751 for field in fields.iter() {
752 let field = field.to_string();
753 self.params.group[field.as_str()] = field.clone().into();
754 self.params.fields[field.as_str()] = field.clone().into();
755 }
756 self
757 }
758
759 fn distinct(&mut self) -> &mut Self {
760 self.params.distinct = true;
761 self
762 }
763
764 fn json(&mut self, field: &str) -> &mut Self {
765 let list: Vec<&str> = field.split(",").collect();
766 for item in list.iter() {
767 self.params.json[item.to_string().as_str()] = item.to_string().into();
768 }
769 self
770 }
771
772 fn location(&mut self, field: &str) -> &mut Self {
773 let list: Vec<&str> = field.split(",").collect();
774 for item in list.iter() {
775 self.params.location[item.to_string().as_str()] = item.to_string().into();
776 }
777 self
778 }
779
780 fn field(&mut self, field: &str) -> &mut Self {
781 let list: Vec<&str> = field.split(",").collect();
782 let join_table = if self.params.join_table.is_empty() {
783 self.params.table.clone()
784 } else {
785 self.params.join_table.clone()
786 };
787 for item in list.iter() {
788 let lower = item.to_lowercase();
789 let is_expr = lower.contains("count(")
790 || lower.contains("sum(")
791 || lower.contains("avg(")
792 || lower.contains("max(")
793 || lower.contains("min(")
794 || lower.contains("case ");
795 if is_expr {
796 self.params.fields[item.to_string().as_str()] = (*item).into();
797 } else if item.contains(" as ") {
798 let text = item.split(" as ").collect::<Vec<&str>>();
799 self.params.fields[item.to_string().as_str()] =
800 format!("{}.{} as {}", join_table, text[0], text[1]).into();
801 } else {
802 self.params.fields[item.to_string().as_str()] =
803 format!("{join_table}.{item}").into();
804 }
805 }
806 self
807 }
808
809 fn field_raw(&mut self, expr: &str) -> &mut Self {
810 self.params.fields[expr] = expr.into();
811 self
812 }
813
814 fn hidden(&mut self, name: &str) -> &mut Self {
815 let hidden: Vec<&str> = name.split(",").collect();
816
817 let fields_list = self.table_info(self.params.clone().table.as_str());
818 let mut data = array![];
819 for item in fields_list.members() {
820 let _ = data.push(object! {
821 "name":item["field"].as_str().unwrap_or("")
822 });
823 }
824
825 for item in data.members() {
826 let name = item["name"].as_str().unwrap_or("");
827 if !hidden.contains(&name) {
828 self.params.fields[name] = name.into();
829 }
830 }
831 self
832 }
833
834 fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
835 for f in field.split('|') {
836 if !super::sql_safety::validate_field_name(f) {
837 error!("Invalid field name: {}", f);
838 }
839 }
840 if !super::sql_safety::validate_compare_orator(compare) {
841 error!("Invalid compare operator: {}", compare);
842 }
843 let join_table = if self.params.join_table.is_empty() {
844 self.params.table.clone()
845 } else {
846 self.params.join_table.clone()
847 };
848 if value.is_boolean() {
849 let bool_val = value.as_bool().unwrap_or(false);
850 self.params
851 .where_and
852 .push(format!("{join_table}.{field} {compare} {bool_val}"));
853 return self;
854 }
855 match compare {
856 "between" => {
857 self.params.where_and.push(format!(
858 "{}.{} between '{}' AND '{}'",
859 join_table, field, value[0], value[1]
860 ));
861 }
862 "set" => {
863 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
864 let mut wheredata = vec![];
865 for item in list.iter() {
866 wheredata.push(format!(
867 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
868 ));
869 }
870 self.params
871 .where_and
872 .push(format!("({})", wheredata.join(" or ")));
873 }
874 "notin" => {
875 let mut text = String::new();
876 for item in value.members() {
877 text = format!("{text},'{item}'");
878 }
879 text = text.trim_start_matches(",").into();
880 self.params
881 .where_and
882 .push(format!("{join_table}.{field} not in ({text})"));
883 }
884 "is" => {
885 self.params
886 .where_and
887 .push(format!("{join_table}.{field} is {value}"));
888 }
889 "isnot" => {
890 self.params
891 .where_and
892 .push(format!("{join_table}.{field} is not {value}"));
893 }
894 "notlike" => {
895 self.params
896 .where_and
897 .push(format!("{join_table}.{field} not like '{value}'"));
898 }
899 "in" => {
900 if value.is_array() && value.is_empty() {
901 self.params.where_and.push("1=0".to_string());
902 return self;
903 }
904 let mut text = String::new();
905 if value.is_array() {
906 for item in value.members() {
907 text = format!("{text},'{item}'");
908 }
909 } else if value.is_null() {
910 text = format!("{text},null");
911 } else {
912 let value = value.as_str().unwrap_or("");
913
914 let value: Vec<&str> = value.split(",").collect();
915 for item in value.iter() {
916 text = format!("{text},'{item}'");
917 }
918 }
919 text = text.trim_start_matches(",").into();
920
921 self.params
922 .where_and
923 .push(format!("{join_table}.{field} {compare} ({text})"));
924 }
925 _ => {
926 self.params
927 .where_and
928 .push(format!("{join_table}.{field} {compare} '{value}'"));
929 }
930 }
931 self
932 }
933
934 fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
935 for f in field.split('|') {
936 if !super::sql_safety::validate_field_name(f) {
937 error!("Invalid field name: {}", f);
938 }
939 }
940 if !super::sql_safety::validate_compare_orator(compare) {
941 error!("Invalid compare operator: {}", compare);
942 }
943 let join_table = if self.params.join_table.is_empty() {
944 self.params.table.clone()
945 } else {
946 self.params.join_table.clone()
947 };
948
949 if value.is_boolean() {
950 let bool_val = value.as_bool().unwrap_or(false);
951 self.params
952 .where_or
953 .push(format!("{join_table}.{field} {compare} {bool_val}"));
954 return self;
955 }
956
957 match compare {
958 "between" => {
959 self.params.where_or.push(format!(
960 "{}.{} between '{}' AND '{}'",
961 join_table, field, value[0], value[1]
962 ));
963 }
964 "set" => {
965 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
966 let mut wheredata = vec![];
967 for item in list.iter() {
968 wheredata.push(format!(
969 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
970 ));
971 }
972 self.params
973 .where_or
974 .push(format!("({})", wheredata.join(" or ")));
975 }
976 "notin" => {
977 let mut text = String::new();
978 for item in value.members() {
979 text = format!("{text},'{item}'");
980 }
981 text = text.trim_start_matches(",").into();
982 self.params
983 .where_or
984 .push(format!("{join_table}.{field} not in ({text})"));
985 }
986 "is" => {
987 self.params
988 .where_or
989 .push(format!("{join_table}.{field} is {value}"));
990 }
991 "isnot" => {
992 self.params
993 .where_or
994 .push(format!("{join_table}.{field} is not {value}"));
995 }
996 "in" => {
997 if value.is_array() && value.is_empty() {
998 self.params.where_or.push("1=0".to_string());
999 return self;
1000 }
1001 let mut text = String::new();
1002 if value.is_array() {
1003 for item in value.members() {
1004 text = format!("{text},'{item}'");
1005 }
1006 } else {
1007 let value = value.as_str().unwrap_or("");
1008 let value: Vec<&str> = value.split(",").collect();
1009 for item in value.iter() {
1010 text = format!("{text},'{item}'");
1011 }
1012 }
1013 text = text.trim_start_matches(",").into();
1014 self.params
1015 .where_or
1016 .push(format!("{join_table}.{field} {compare} ({text})"));
1017 }
1018 _ => {
1019 self.params
1020 .where_or
1021 .push(format!("{join_table}.{field} {compare} '{value}'"));
1022 }
1023 }
1024 self
1025 }
1026
1027 fn where_raw(&mut self, expr: &str) -> &mut Self {
1028 self.params.where_and.push(expr.to_string());
1029 self
1030 }
1031
1032 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1033 self.params
1034 .where_and
1035 .push(format!("\"{field}\" IN ({sub_sql})"));
1036 self
1037 }
1038
1039 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1040 self.params
1041 .where_and
1042 .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1043 self
1044 }
1045
1046 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1047 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1048 self
1049 }
1050
1051 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1052 self.params
1053 .where_and
1054 .push(format!("NOT EXISTS ({sub_sql})"));
1055 self
1056 }
1057
1058 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1059 self.params.where_column = format!(
1060 "{}.{} {} {}.{}",
1061 self.params.table, field_a, compare, self.params.table, field_b
1062 );
1063 self
1064 }
1065
1066 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1067 self.params
1068 .update_column
1069 .push(format!("{field_a} = {compare}"));
1070 self
1071 }
1072
1073 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1074 self.params.page = page;
1075 self.params.limit = limit;
1076 self
1077 }
1078
1079 fn limit(&mut self, count: i32) -> &mut Self {
1080 self.params.limit_only = count;
1081 self
1082 }
1083
1084 fn column(&mut self, field: &str) -> JsonValue {
1085 self.field(field);
1086 let sql = self.params.select_sql();
1087
1088 if self.params.sql {
1089 return JsonValue::from(sql);
1090 }
1091 let (state, data) = self.query(sql.as_str());
1092 match state {
1093 true => {
1094 let mut list = array![];
1095 for item in data.members() {
1096 if self.params.json[field].is_empty() {
1097 let _ = list.push(item[field].clone());
1098 } else {
1099 let data =
1100 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1101 let _ = list.push(data);
1102 }
1103 }
1104 list
1105 }
1106 false => {
1107 array![]
1108 }
1109 }
1110 }
1111
1112 fn count(&mut self) -> JsonValue {
1113 self.params.fields = json::object! {};
1114 self.params.fields["count"] = "count(*) as count".to_string().into();
1115 let sql = self.params.select_sql();
1116 if self.params.sql {
1117 return JsonValue::from(sql.clone());
1118 }
1119 let (state, data) = self.query(sql.as_str());
1120 if state {
1121 data[0]["count"].clone()
1122 } else {
1123 JsonValue::from(0)
1124 }
1125 }
1126
1127 fn max(&mut self, field: &str) -> JsonValue {
1128 self.params.fields[field] = format!("max({field}) as {field}").into();
1129 let sql = self.params.select_sql();
1130 if self.params.sql {
1131 return JsonValue::from(sql.clone());
1132 }
1133 let (state, data) = self.query(sql.as_str());
1134 if state {
1135 if data.len() > 1 {
1136 return data.clone();
1137 }
1138 data[0][field].clone()
1139 } else {
1140 JsonValue::from(0)
1141 }
1142 }
1143
1144 fn min(&mut self, field: &str) -> JsonValue {
1145 self.params.fields[field] = format!("min({field}) as {field}").into();
1146 let sql = self.params.select_sql();
1147 if self.params.sql {
1148 return JsonValue::from(sql.clone());
1149 }
1150 let (state, data) = self.query(sql.as_str());
1151 if state {
1152 if data.len() > 1 {
1153 return data;
1154 }
1155 data[0][field].clone()
1156 } else {
1157 JsonValue::from(0)
1158 }
1159 }
1160
1161 fn sum(&mut self, field: &str) -> JsonValue {
1162 self.params.fields[field] = format!("sum({field}) as {field}").into();
1163 let sql = self.params.select_sql();
1164 if self.params.sql {
1165 return JsonValue::from(sql.clone());
1166 }
1167 let (state, data) = self.query(sql.as_str());
1168 match state {
1169 true => {
1170 if data.len() > 1 {
1171 return data;
1172 }
1173 data[0][field].clone()
1174 }
1175 false => JsonValue::from(0),
1176 }
1177 }
1178
1179 fn avg(&mut self, field: &str) -> JsonValue {
1180 self.params.fields[field] = format!("avg({field}) as {field}").into();
1181 let sql = self.params.select_sql();
1182 if self.params.sql {
1183 return JsonValue::from(sql.clone());
1184 }
1185 let (state, data) = self.query(sql.as_str());
1186 if state {
1187 if data.len() > 1 {
1188 return data;
1189 }
1190 data[0][field].clone()
1191 } else {
1192 JsonValue::from(0)
1193 }
1194 }
1195
1196 fn having(&mut self, expr: &str) -> &mut Self {
1197 self.params.having.push(expr.to_string());
1198 self
1199 }
1200
1201 fn select(&mut self) -> JsonValue {
1202 let sql = self.params.select_sql();
1203 if self.params.sql {
1204 return JsonValue::from(sql.clone());
1205 }
1206 let (state, mut data) = self.query(sql.as_str());
1207 match state {
1208 true => {
1209 for (field, _) in self.params.json.entries() {
1210 for item in data.members_mut() {
1211 if !item[field].is_empty() {
1212 let json = item[field].to_string();
1213 item[field] = match json::parse(&json) {
1214 Ok(e) => e,
1215 Err(_) => JsonValue::from(json),
1216 };
1217 }
1218 }
1219 }
1220 data.clone()
1221 }
1222 false => array![],
1223 }
1224 }
1225
1226 fn find(&mut self) -> JsonValue {
1227 self.params.page = 1;
1228 self.params.limit = 1;
1229 let sql = self.params.select_sql();
1230 if self.params.sql {
1231 return JsonValue::from(sql.clone());
1232 }
1233 let (state, mut data) = self.query(sql.as_str());
1234 match state {
1235 true => {
1236 if data.is_empty() {
1237 return object! {};
1238 }
1239 for (field, _) in self.params.json.entries() {
1240 if !data[0][field].is_empty() {
1241 let json = data[0][field].to_string();
1242 let json = json::parse(&json).unwrap_or(array![]);
1243 data[0][field] = json;
1244 } else {
1245 data[0][field] = array![];
1246 }
1247 }
1248 data[0].clone()
1249 }
1250 false => {
1251 object! {}
1252 }
1253 }
1254 }
1255
1256 fn value(&mut self, field: &str) -> JsonValue {
1257 self.params.fields = object! {};
1258 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1259 self.params.page = 1;
1260 self.params.limit = 1;
1261 let sql = self.params.select_sql();
1262 if self.params.sql {
1263 return JsonValue::from(sql.clone());
1264 }
1265 let (state, mut data) = self.query(sql.as_str());
1266 match state {
1267 true => {
1268 for (field, _) in self.params.json.entries() {
1269 if !data[0][field].is_empty() {
1270 let json = data[0][field].to_string();
1271 let json = json::parse(&json).unwrap_or(array![]);
1272 data[0][field] = json;
1273 } else {
1274 data[0][field] = array![];
1275 }
1276 }
1277 data[0][field].clone()
1278 }
1279 false => {
1280 if self.connection.debug {
1281 info!("{data:?}");
1282 }
1283 JsonValue::Null
1284 }
1285 }
1286 }
1287
1288 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1289 let fields_list = self.table_info(&self.params.table.clone());
1290 let mut fields = vec![];
1291 let mut values = vec![];
1292 if !self.params.autoinc && data["id"].is_empty() {
1293 let thread_id = format!("{:?}", std::thread::current().id());
1294 let thread_num: u64 = thread_id
1295 .trim_start_matches("ThreadId(")
1296 .trim_end_matches(")")
1297 .parse()
1298 .unwrap_or(0);
1299 data["id"] = format!(
1300 "{:X}{:X}",
1301 Local::now().timestamp_nanos_opt().unwrap_or(0),
1302 thread_num
1303 )
1304 .into();
1305 }
1306 for (field, value) in data.entries() {
1307 fields.push(format!("\"{}\"", field));
1308
1309 if value.is_string() {
1310 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1311 continue;
1312 } else if value.is_array() {
1313 if self.params.json[field].is_empty() {
1314 let array = value
1315 .members()
1316 .map(|x| x.as_str().unwrap_or(""))
1317 .collect::<Vec<&str>>()
1318 .join(",");
1319 values.push(format!("'{array}'"));
1320 } else {
1321 let json = value.to_string();
1322 let json = json.replace("'", "''");
1323 values.push(format!("'{json}'"));
1324 }
1325 continue;
1326 } else if value.is_object() {
1327 if self.params.json[field].is_empty() {
1328 values.push(format!("'{value}'"));
1329 } else {
1330 let json = value.to_string();
1331 let json = json.replace("'", "''");
1332 values.push(format!("'{json}'"));
1333 }
1334 continue;
1335 } else if value.is_number() {
1336 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1337 if col_type == "boolean" {
1338 let bool_val = value.as_i64().unwrap_or(0) != 0;
1339 values.push(format!("{bool_val}"));
1340 } else if col_type.contains("int") {
1341 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1342 } else {
1343 values.push(format!("{value}"));
1344 }
1345 continue;
1346 } else if value.is_boolean() || value.is_null() {
1347 values.push(format!("{value}"));
1348 continue;
1349 } else {
1350 values.push(format!("'{value}'"));
1351 continue;
1352 }
1353 }
1354 let fields = fields.join(",");
1355 let values = values.join(",");
1356
1357 let sql = format!(
1358 "INSERT INTO {} ({}) VALUES ({});",
1359 self.params.table, fields, values
1360 );
1361 if self.params.sql {
1362 return JsonValue::from(sql.clone());
1363 }
1364 let (state, ids) = self.execute(sql.as_str());
1365
1366 match state {
1367 true => match self.params.autoinc {
1368 true => ids.clone(),
1369 false => data["id"].clone(),
1370 },
1371 false => {
1372 let thread_id = format!("{:?}", thread::current().id());
1373 error!("添加失败: {thread_id} {ids:?} {sql}");
1374 JsonValue::from("")
1375 }
1376 }
1377 }
1378 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1379 let fields_list = self.table_info(&self.params.table.clone());
1380 let mut fields = String::new();
1381 if !self.params.autoinc && data[0]["id"].is_empty() {
1382 data[0]["id"] = "".into();
1383 }
1384 for (field, _) in data[0].entries() {
1385 fields = format!("{fields},\"{field}\"");
1386 }
1387 fields = fields.trim_start_matches(",").to_string();
1388
1389 let core_count = num_cpus::get();
1390 let mut p = pools::Pool::new(core_count * 4);
1391
1392 let autoinc = self.params.autoinc;
1393 for list in data.members() {
1394 let mut item = list.clone();
1395 let i = br_fields::str::Code::verification_code(3);
1396 let fields_list_new = fields_list.clone();
1397 p.execute(move |pcindex| {
1398 if !autoinc && item["id"].is_empty() {
1399 let id = format!(
1400 "{:X}{:X}{}",
1401 Local::now().timestamp_nanos_opt().unwrap_or(0),
1402 pcindex,
1403 i
1404 );
1405 item["id"] = id.into();
1406 }
1407 let mut values = "".to_string();
1408 for (field, value) in item.entries() {
1409 if value.is_string() {
1410 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1411 } else if value.is_number() {
1412 let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1413 if col_type == "boolean" {
1414 let bool_val = value.as_i64().unwrap_or(0) != 0;
1415 values = format!("{values},{bool_val}");
1416 } else if col_type.contains("int") {
1417 values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1418 } else {
1419 values = format!("{values},{value}");
1420 }
1421 } else if value.is_boolean() {
1422 values = format!("{values},{value}");
1423 continue;
1424 } else {
1425 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1426 }
1427 }
1428 values = format!("({})", values.trim_start_matches(","));
1429 array![item["id"].clone(), values]
1430 });
1431 }
1432 let (ids_list, mut values) = p.insert_all();
1433 values = values.trim_start_matches(",").to_string();
1434 let sql = format!(
1435 "INSERT INTO {} ({}) VALUES {};",
1436 self.params.table, fields, values
1437 );
1438
1439 if self.params.sql {
1440 return JsonValue::from(sql.clone());
1441 }
1442 let (state, data) = self.execute(sql.as_str());
1443 match state {
1444 true => match autoinc {
1445 true => data,
1446 false => JsonValue::from(ids_list),
1447 },
1448 false => {
1449 error!("insert_all: {data:?}");
1450 array![]
1451 }
1452 }
1453 }
1454 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1455 let fields_list = self.table_info(&self.params.table.clone());
1456 let mut fields = vec![];
1457 let mut values = vec![];
1458 if !self.params.autoinc && data["id"].is_empty() {
1459 let thread_id = format!("{:?}", std::thread::current().id());
1460 let thread_num: u64 = thread_id
1461 .trim_start_matches("ThreadId(")
1462 .trim_end_matches(")")
1463 .parse()
1464 .unwrap_or(0);
1465 data["id"] = format!(
1466 "{:X}{:X}",
1467 Local::now().timestamp_nanos_opt().unwrap_or(0),
1468 thread_num
1469 )
1470 .into();
1471 }
1472 for (field, value) in data.entries() {
1473 fields.push(format!("\"{}\"", field));
1474
1475 if value.is_string() {
1476 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1477 continue;
1478 } else if value.is_array() {
1479 if self.params.json[field].is_empty() {
1480 let array = value
1481 .members()
1482 .map(|x| x.as_str().unwrap_or(""))
1483 .collect::<Vec<&str>>()
1484 .join(",");
1485 values.push(format!("'{array}'"));
1486 } else {
1487 let json = value.to_string();
1488 let json = json.replace("'", "''");
1489 values.push(format!("'{json}'"));
1490 }
1491 continue;
1492 } else if value.is_object() {
1493 if self.params.json[field].is_empty() {
1494 values.push(format!("'{value}'"));
1495 } else {
1496 let json = value.to_string();
1497 let json = json.replace("'", "''");
1498 values.push(format!("'{json}'"));
1499 }
1500 continue;
1501 } else if value.is_number() {
1502 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1503 if col_type == "boolean" {
1504 let bool_val = value.as_i64().unwrap_or(0) != 0;
1505 values.push(format!("{bool_val}"));
1506 } else if col_type.contains("int") {
1507 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1508 } else {
1509 values.push(format!("{value}"));
1510 }
1511 continue;
1512 } else if value.is_boolean() || value.is_null() {
1513 values.push(format!("{value}"));
1514 continue;
1515 } else {
1516 values.push(format!("'{value}'"));
1517 continue;
1518 }
1519 }
1520
1521 let conflict_cols: Vec<String> = conflict_fields
1522 .iter()
1523 .map(|f| format!("\"{}\"", f))
1524 .collect();
1525
1526 let update_set: Vec<String> = fields
1527 .iter()
1528 .filter(|f| {
1529 let name = f.trim_matches('"');
1530 !conflict_fields.contains(&name) && name != "id"
1531 })
1532 .map(|f| format!("{f}=EXCLUDED.{f}"))
1533 .collect();
1534
1535 let fields_str = fields.join(",");
1536 let values_str = values.join(",");
1537
1538 let sql = format!(
1539 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1540 self.params.table,
1541 fields_str,
1542 values_str,
1543 conflict_cols.join(","),
1544 update_set.join(",")
1545 );
1546 if self.params.sql {
1547 return JsonValue::from(sql.clone());
1548 }
1549 let (state, result) = self.execute(sql.as_str());
1550 match state {
1551 true => match self.params.autoinc {
1552 true => result.clone(),
1553 false => data["id"].clone(),
1554 },
1555 false => {
1556 let thread_id = format!("{:?}", thread::current().id());
1557 error!("upsert失败: {thread_id} {result:?} {sql}");
1558 JsonValue::from("")
1559 }
1560 }
1561 }
1562 fn update(&mut self, data: JsonValue) -> JsonValue {
1563 let fields_list = self.table_info(&self.params.table.clone());
1564 let mut values = vec![];
1565 for (field, value) in data.entries() {
1566 if value.is_string() {
1567 values.push(format!(
1568 "\"{}\"='{}'",
1569 field,
1570 value.to_string().replace("'", "''")
1571 ));
1572 } else if value.is_number() {
1573 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1574 if col_type == "boolean" {
1575 let bool_val = value.as_i64().unwrap_or(0) != 0;
1576 values.push(format!("\"{field}\"= {bool_val}"));
1577 } else if col_type.contains("int") {
1578 values.push(format!(
1579 "\"{}\"= {}",
1580 field,
1581 value.as_f64().unwrap_or(0.0) as i64
1582 ));
1583 } else {
1584 values.push(format!("\"{field}\"= {value}"));
1585 }
1586 } else if value.is_array() {
1587 if self.params.json[field].is_empty() {
1588 let array = value
1589 .members()
1590 .map(|x| x.as_str().unwrap_or(""))
1591 .collect::<Vec<&str>>()
1592 .join(",");
1593 values.push(format!("\"{field}\"='{array}'"));
1594 } else {
1595 let json = value.to_string();
1596 let json = json.replace("'", "''");
1597 values.push(format!("\"{field}\"='{json}'"));
1598 }
1599 continue;
1600 } else if value.is_object() {
1601 if self.params.json[field].is_empty() {
1602 values.push(format!("\"{field}\"='{value}'"));
1603 } else {
1604 if value.is_empty() {
1605 values.push(format!("\"{field}\"=''"));
1606 continue;
1607 }
1608 let json = value.to_string();
1609 let json = json.replace("'", "''");
1610 values.push(format!("\"{field}\"='{json}'"));
1611 }
1612 continue;
1613 } else if value.is_boolean() {
1614 values.push(format!("\"{field}\"= {value}"));
1615 } else {
1616 values.push(format!("\"{field}\"=\"{value}\""));
1617 }
1618 }
1619
1620 for (field, value) in self.params.inc_dec.entries() {
1621 values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1622 }
1623 if !self.params.update_column.is_empty() {
1624 values.extend(self.params.update_column.clone());
1625 }
1626 let values = values.join(",");
1627
1628 let sql = format!(
1629 "UPDATE {} SET {} {};",
1630 self.params.table.clone(),
1631 values,
1632 self.params.where_sql()
1633 );
1634 if self.params.sql {
1635 return JsonValue::from(sql.clone());
1636 }
1637 let (state, data) = self.execute(sql.as_str());
1638 if state {
1639 data
1640 } else {
1641 let thread_id = format!("{:?}", thread::current().id());
1642 error!("update: {thread_id} {data:?} {sql}");
1643 0.into()
1644 }
1645 }
1646 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1647 let fields_list = self.table_info(&self.params.table.clone());
1648 let mut values = vec![];
1649
1650 let mut ids = vec![];
1651 for (field, _) in data[0].entries() {
1652 if field == "id" {
1653 continue;
1654 }
1655 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1656 let mut fields = vec![];
1657 for row in data.members() {
1658 let value = row[field].clone();
1659 let id = row["id"].clone();
1660 ids.push(id.clone());
1661 if value.is_string() {
1662 fields.push(format!(
1663 "WHEN '{}' THEN '{}'",
1664 id,
1665 value.to_string().replace("'", "''")
1666 ));
1667 } else if value.is_array() || value.is_object() {
1668 if self.params.json[field].is_empty() {
1669 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1670 } else {
1671 let json = value.to_string();
1672 let json = json.replace("'", "''");
1673 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1674 }
1675 continue;
1676 } else if value.is_number() {
1677 if col_type == "boolean" {
1678 let bool_val = value.as_i64().unwrap_or(0) != 0;
1679 fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1680 } else {
1681 fields.push(format!("WHEN '{id}' THEN {value}"));
1682 }
1683 } else if value.is_boolean() || value.is_null() {
1684 fields.push(format!("WHEN '{id}' THEN {value}"));
1685 } else {
1686 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1687 }
1688 }
1689 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1690 }
1691 self.where_and("id", "in", ids.into());
1692 for (field, value) in self.params.inc_dec.entries() {
1693 values.push(format!("{} = {}", field, value.to_string().clone()));
1694 }
1695
1696 let values = values.join(",");
1697 let sql = format!(
1698 "UPDATE {} SET {} {} {};",
1699 self.params.table.clone(),
1700 values,
1701 self.params.where_sql(),
1702 self.params.page_limit_sql()
1703 );
1704 if self.params.sql {
1705 return JsonValue::from(sql.clone());
1706 }
1707 let (state, data) = self.execute(sql.as_str());
1708 if state {
1709 data
1710 } else {
1711 error!("update_all: {data:?}");
1712 JsonValue::from(0)
1713 }
1714 }
1715
1716 fn delete(&mut self) -> JsonValue {
1717 let sql = format!(
1718 "delete FROM {} {} {};",
1719 self.params.table.clone(),
1720 self.params.where_sql(),
1721 self.params.page_limit_sql()
1722 );
1723 if self.params.sql {
1724 return JsonValue::from(sql.clone());
1725 }
1726 let (state, data) = self.execute(sql.as_str());
1727 match state {
1728 true => data,
1729 false => {
1730 error!("delete 失败>>> {data:?}");
1731 JsonValue::from(0)
1732 }
1733 }
1734 }
1735
1736 fn transaction(&mut self) -> bool {
1737 let thread_id = format!("{:?}", thread::current().id());
1738 let key = format!("{}{}", self.default, thread_id);
1739
1740 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1741 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1742 PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1743 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1744 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1745 return true;
1746 }
1747
1748 let mut conn = match self.client.get_connect_for_transaction() {
1749 Ok(c) => c,
1750 Err(e) => {
1751 error!("获取事务连接失败: {e}");
1752 return false;
1753 }
1754 };
1755
1756 if !conn.is_valid() {
1757 error!("事务连接无效");
1758 self.client.release_transaction_conn();
1759 return false;
1760 }
1761
1762 if let Err(e) = conn.execute("START TRANSACTION") {
1763 error!("启动事务失败: {e}");
1764 self.client.release_transaction_conn();
1765 return false;
1766 }
1767
1768 if let Err(e) = conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") {
1769 error!("设置事务隔离级别失败: {e}");
1770 let _ = conn.execute("ROLLBACK");
1771 self.client.release_transaction_conn();
1772 return false;
1773 }
1774
1775 PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1776 true
1777 }
1778 fn commit(&mut self) -> bool {
1779 let thread_id = format!("{:?}", thread::current().id());
1780 let key = format!("{}{}", self.default, thread_id);
1781
1782 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1783 error!("commit: 没有活跃的事务");
1784 return false;
1785 }
1786
1787 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1788 if depth > 1 {
1789 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1790 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1791 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1792 return true;
1793 }
1794
1795 let commit_result =
1796 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1797
1798 let success = match commit_result {
1799 Some(Ok(_)) => true,
1800 Some(Err(e)) => {
1801 error!("提交事务失败: {e}");
1802 false
1803 }
1804 None => {
1805 error!("提交事务失败: 未找到连接");
1806 false
1807 }
1808 };
1809
1810 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1811 self.client.release_transaction_conn();
1812 success
1813 }
1814
1815 fn rollback(&mut self) -> bool {
1816 let thread_id = format!("{:?}", thread::current().id());
1817 let key = format!("{}{}", self.default, thread_id);
1818
1819 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1820 error!("rollback: 没有活跃的事务");
1821 return false;
1822 }
1823
1824 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1825 if depth > 1 {
1826 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1827 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1828 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1829 return true;
1830 }
1831
1832 let rollback_result =
1833 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1834
1835 let success = match rollback_result {
1836 Some(Ok(_)) => true,
1837 Some(Err(e)) => {
1838 error!("回滚失败: {e}");
1839 false
1840 }
1841 None => {
1842 error!("回滚失败: 未找到连接");
1843 false
1844 }
1845 };
1846
1847 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1848 self.client.release_transaction_conn();
1849 success
1850 }
1851
1852 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1853 let (state, data) = self.query(sql);
1854 match state {
1855 true => Ok(data),
1856 false => Err(data.to_string()),
1857 }
1858 }
1859
1860 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1861 let (state, data) = self.execute(sql);
1862 match state {
1863 true => Ok(data),
1864 false => Err(data.to_string()),
1865 }
1866 }
1867
1868 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1869 self.params.inc_dec[field] = format!("{field} + {num}").into();
1870 self
1871 }
1872
1873 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1874 self.params.inc_dec[field] = format!("{field} - {num}").into();
1875 self
1876 }
1877 fn buildsql(&mut self) -> String {
1878 self.fetch_sql();
1879 let sql = self.select().to_string();
1880 format!("( {} ) {}", sql, self.params.table)
1881 }
1882
1883 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1884 for field in fields {
1885 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1886 }
1887 self
1888 }
1889
1890 fn join(
1891 &mut self,
1892 main_table: &str,
1893 main_fields: &str,
1894 right_table: &str,
1895 right_fields: &str,
1896 ) -> &mut Self {
1897 let main_table = if main_table.is_empty() {
1898 self.params.table.clone()
1899 } else {
1900 main_table.to_string()
1901 };
1902 self.params.join_table = right_table.to_string();
1903 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1904 self
1905 }
1906
1907 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1908 let main_fields = if main_fields.is_empty() {
1909 "id"
1910 } else {
1911 main_fields
1912 };
1913 let second_fields = if second_fields.is_empty() {
1914 self.params.table.clone()
1915 } else {
1916 second_fields.to_string().clone()
1917 };
1918 let sec_table_name = format!("{}{}", table, "_2");
1919 let second_table = format!("{} {}", table, sec_table_name.clone());
1920 self.params.join_table = sec_table_name.clone();
1921 self.params.join.push(format!(
1922 " INNER JOIN {} ON {}.{} = {}.{}",
1923 second_table, self.params.table, main_fields, sec_table_name, second_fields
1924 ));
1925 self
1926 }
1927
1928 fn join_right(
1929 &mut self,
1930 main_table: &str,
1931 main_fields: &str,
1932 right_table: &str,
1933 right_fields: &str,
1934 ) -> &mut Self {
1935 let main_table = if main_table.is_empty() {
1936 self.params.table.clone()
1937 } else {
1938 main_table.to_string()
1939 };
1940 self.params.join_table = right_table.to_string();
1941 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1942 self
1943 }
1944
1945 fn join_full(
1946 &mut self,
1947 main_table: &str,
1948 main_fields: &str,
1949 right_table: &str,
1950 right_fields: &str,
1951 ) -> &mut Self {
1952 let main_table = if main_table.is_empty() {
1953 self.params.table.clone()
1954 } else {
1955 main_table.to_string()
1956 };
1957 self.params.join_table = right_table.to_string();
1958 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1959 self
1960 }
1961
1962 fn union(&mut self, sub_sql: &str) -> &mut Self {
1963 self.params.unions.push(format!("UNION {sub_sql}"));
1964 self
1965 }
1966
1967 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1968 self.params.unions.push(format!("UNION ALL {sub_sql}"));
1969 self
1970 }
1971
1972 fn lock_for_update(&mut self) -> &mut Self {
1973 self.params.lock_mode = "FOR UPDATE".to_string();
1974 self
1975 }
1976
1977 fn lock_for_share(&mut self) -> &mut Self {
1978 self.params.lock_mode = "FOR SHARE".to_string();
1979 self
1980 }
1981}