1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use chrono::Local;
7use json::{array, object, JsonValue};
8use log::{error, info};
9use std::thread;
10#[derive(Clone)]
11pub struct Pgsql {
12 pub connection: Connection,
14 pub default: String,
16 pub params: Params,
17 pub client: Pools,
18}
19
20impl Pgsql {
21 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
22 let port = connection
23 .hostport
24 .parse::<i32>()
25 .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
26
27 let cp_connection = connection.clone();
28 let config = object! {
29 debug: cp_connection.debug,
30 username: cp_connection.username,
31 userpass: cp_connection.userpass,
32 database: cp_connection.database,
33 hostname: cp_connection.hostname,
34 hostport: port,
35 charset: cp_connection.charset.str(),
36 };
37 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
38
39 let pools = pgsql.pools()?;
40 Ok(Self {
41 connection,
42 default: default.clone(),
43 params: Params::default("pgsql"),
44 client: pools,
45 })
46 }
47
48 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
49 let thread_id = format!("{:?}", thread::current().id());
50 let key = format!("{}{}", self.default, thread_id);
51
52 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
53 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
54
55 match result {
56 Some(Ok(e)) => {
57 if self.connection.debug {
58 info!("查询成功: {} {}", thread_id.clone(), sql);
59 }
60 (true, e.rows)
61 }
62 Some(Err(e)) => {
63 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
64 (false, JsonValue::from(e.to_string()))
65 }
66 None => {
67 error!("事务查询失败: 未找到事务连接 {thread_id}");
68 (false, JsonValue::from("未找到事务连接"))
69 }
70 }
71 } else {
72 let mut guard = match self.client.get_guard() {
73 Ok(g) => g,
74 Err(e) => {
75 error!(
76 "非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
77 );
78 return (false, JsonValue::from(e.to_string()));
79 }
80 };
81
82 let res = guard.conn().query(sql);
83 match res {
84 Ok(e) => {
85 if self.connection.debug {
86 info!("查询成功: {} {}", thread_id.clone(), sql);
87 }
88 (true, e.rows)
89 }
90 Err(e) => {
91 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
92 (false, JsonValue::from(e.to_string()))
93 }
94 }
95 }
96 }
97 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
98 let thread_id = format!("{:?}", thread::current().id());
99 let key = format!("{}{}", self.default, thread_id);
100
101 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
102 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
103
104 match result {
105 Some(Ok(e)) => {
106 if self.connection.debug {
107 info!("提交成功: {} {}", thread_id.clone(), sql);
108 }
109 if sql.contains("INSERT") {
110 (true, e.rows)
111 } else {
112 (true, e.affect_count.into())
113 }
114 }
115 Some(Err(e)) => {
116 error!("事务提交失败: {thread_id} {e} {sql}");
117 (false, JsonValue::from(e.to_string()))
118 }
119 None => {
120 error!("事务执行失败: 未找到事务连接 {thread_id}");
121 (false, JsonValue::from("未找到事务连接"))
122 }
123 }
124 } else {
125 let mut guard = match self.client.get_guard() {
126 Ok(g) => g,
127 Err(e) => {
128 error!(
129 "非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
130 );
131 return (false, JsonValue::from(e.to_string()));
132 }
133 };
134
135 let res = guard.conn().execute(sql);
136 match res {
137 Ok(e) => {
138 if self.connection.debug {
139 info!("提交成功: {} {}", thread_id.clone(), sql);
140 }
141 if sql.contains("INSERT") {
142 (true, e.rows)
143 } else {
144 (true, e.affect_count.into())
145 }
146 }
147 Err(e) => {
148 error!("非事务提交失败: {thread_id} {e} {sql}");
149 (false, JsonValue::from(e.to_string()))
150 }
151 }
152 }
153 }
154}
155
156impl DbMode for Pgsql {
157 fn database_tables(&mut self) -> JsonValue {
158 let sql = "SHOW TABLES".to_string();
159 match self.sql(sql.as_str()) {
160 Ok(e) => {
161 let mut list = vec![];
162 for item in e.members() {
163 for (_, value) in item.entries() {
164 list.push(value.clone());
165 }
166 }
167 list.into()
168 }
169 Err(_) => {
170 array![]
171 }
172 }
173 }
174
175 fn database_create(&mut self, name: &str) -> bool {
176 let sql = format!("CREATE DATABASE {name}");
177
178 let (state, data) = self.execute(sql.as_str());
179 match state {
180 true => data.as_bool().unwrap_or(true),
181 false => {
182 error!("创建数据库失败: {data:?}");
183 false
184 }
185 }
186 }
187
188 fn truncate(&mut self, table: &str) -> bool {
189 let sql = format!("TRUNCATE TABLE {table}");
190 let (state, _) = self.execute(sql.as_str());
191 state
192 }
193}
194
195impl Mode for Pgsql {
196 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
197 let mut sql = String::new();
198 let mut comments = vec![];
199
200 if !options.table_unique.is_empty() {
201 let full_name = format!(
202 "{}_unique_{}",
203 options.table_name,
204 options.table_unique.join("_")
205 );
206 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
207 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
208 let unique = format!(
209 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
210 name,
211 options.table_name,
212 options.table_unique.join(",")
213 );
214 comments.push(unique);
215 }
216
217 for row in options.table_index.iter() {
218 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
219 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
220 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
221 let index = format!(
222 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
223 name,
224 options.table_name,
225 row.join(",")
226 );
227 comments.push(index);
228 }
229
230 for (name, field) in options.table_fields.entries_mut() {
231 field["table_name"] = options.table_name.clone().into();
232 let row = br_fields::field("pgsql", name, field.clone());
233 let (col_sql, meta) = if let Some(idx) = row.find("--") {
234 (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
235 } else {
236 (row.trim(), None)
237 };
238 if let Some(meta) = meta {
239 comments.push(format!(
240 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
241 options.table_name, name, meta
242 ));
243 }
244 sql = format!("{} {},\r\n", sql, col_sql);
245 }
246
247 let primary_key = format!(
248 "CONSTRAINT {}_{} PRIMARY KEY ({})",
249 options.table_name, options.table_key, options.table_key
250 );
251 let sql = format!(
252 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
253 options.table_name,
254 sql.trim_end_matches(",\r\n"),
255 primary_key
256 );
257 comments.insert(0, sql);
258
259 for (_name, field) in options.table_fields.entries() {
260 let _ = field["mode"].as_str();
261 }
262
263 if self.params.sql {
264 let info = comments.join("\r\n");
265 return JsonValue::from(info);
266 }
267 for comment in comments {
268 let (state, _) = self.execute(comment.as_str());
269 match state {
270 true => {}
271 false => {
272 return JsonValue::from(state);
273 }
274 }
275 }
276 JsonValue::from(true)
277 }
278
279 fn table_update(&mut self, options: TableOptions) -> JsonValue {
280 let fields_list = self.table_info(&options.table_name);
281 let mut put = vec![];
282 let mut add = vec![];
283 let mut del = vec![];
284 let mut comments = vec![];
285
286 for (key, _) in fields_list.entries() {
287 if options.table_fields[key].is_empty() {
288 del.push(key);
289 }
290 }
291 for (name, field) in options.table_fields.entries() {
292 if !fields_list[name].is_empty() {
293 let old_info = &fields_list[name];
294 let new_field_sql = br_fields::field("pgsql", name, field.clone());
295
296 let old_comment = old_info["comment"].as_str().unwrap_or("");
297 let old_type = old_info["type"].as_str().unwrap_or("");
298
299 let new_comment = if let Some(idx) = new_field_sql.find("--") {
300 new_field_sql[idx + 2..].trim()
301 } else {
302 ""
303 };
304
305 let comment_matches =
306 if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
307 let old_parts: Vec<&str> = old_comment.split('|').collect();
308 let new_parts: Vec<&str> = new_comment.split('|').collect();
309 if old_parts.len() >= 4 && new_parts.len() >= 4 {
310 old_parts[..4] == new_parts[..4]
311 } else {
312 old_comment == new_comment
313 }
314 } else if !old_comment.is_empty() && !new_comment.is_empty() {
315 let old_parts: Vec<&str> = old_comment.split('|').collect();
316 let new_parts: Vec<&str> = new_comment.split('|').collect();
317 if old_parts.len() >= 2
318 && new_parts.len() >= 2
319 && old_parts.len() == new_parts.len()
320 {
321 let old_filtered: Vec<&str> = old_parts
322 .iter()
323 .filter(|v| **v != "true" && **v != "false")
324 .copied()
325 .collect();
326 let new_filtered: Vec<&str> = new_parts
327 .iter()
328 .filter(|v| **v != "true" && **v != "false")
329 .copied()
330 .collect();
331 old_filtered == new_filtered
332 } else {
333 old_comment == new_comment
334 }
335 } else {
336 old_comment == new_comment
337 };
338
339 let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
340 let new_type = if sql_parts.len() > 1 {
341 sql_parts[1].to_lowercase()
342 } else {
343 String::new()
344 };
345
346 let type_matches = match old_type {
347 "integer" => {
348 new_type.contains("int")
349 && !new_type.contains("bigint")
350 && !new_type.contains("smallint")
351 }
352 "bigint" => new_type.contains("bigint"),
353 "smallint" => new_type.contains("smallint"),
354 "boolean" => new_type.contains("boolean"),
355 "text" => new_type.contains("text"),
356 "character varying" => new_type.contains("varchar"),
357 "character" => new_type.contains("char") && !new_type.contains("varchar"),
358 "numeric" => new_type.contains("numeric") || new_type.contains("decimal"),
359 "double precision" => {
360 new_type.contains("double") || new_type.contains("float8")
361 }
362 "real" => new_type.contains("real") || new_type.contains("float4"),
363 "timestamp without time zone" | "timestamp with time zone" => {
364 new_type.contains("timestamp")
365 }
366 "date" => new_type.contains("date") && !new_type.contains("timestamp"),
367 "time without time zone" | "time with time zone" => {
368 new_type.contains("time") && !new_type.contains("timestamp")
369 }
370 "json" | "jsonb" => new_type.contains("json"),
371 "uuid" => new_type.contains("uuid"),
372 "bytea" => new_type.contains("bytea"),
373 _ => old_type == new_type,
374 };
375
376 if type_matches && comment_matches {
377 continue;
378 }
379
380 log::debug!(
381 "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
382 options.table_name,
383 name,
384 type_matches,
385 old_type,
386 new_type,
387 comment_matches
388 );
389 put.push(name);
390 } else {
391 add.push(name);
392 }
393 }
394
395 for name in add.iter() {
396 let name = name.to_string();
397 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
398 let rows = row.split("--").collect::<Vec<&str>>();
399 comments.push(format!(
400 r#"ALTER TABLE "{}" add {};"#,
401 options.table_name,
402 rows[0].trim()
403 ));
404 if rows.len() > 1 {
405 comments.push(format!(
406 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
407 options.table_name,
408 name,
409 rows[1].trim()
410 ));
411 }
412 }
413 for name in del.iter() {
414 comments.push(format!(
415 "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
416 options.table_name, name
417 ));
418 }
419 for name in put.iter() {
420 let name = name.to_string();
421 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
422 let rows = row.split("--").collect::<Vec<&str>>();
423
424 let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
425
426 if sql[1].contains("BOOLEAN") {
427 let text = format!(
428 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
429 options.table_name, name
430 );
431 comments.push(text.clone());
432 let text = format!(
433 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
434 options.table_name, name, sql[1]
435 );
436 comments.push(text.clone());
437 } else {
438 let text = format!(
439 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
440 options.table_name, name, sql[1]
441 );
442 comments.push(text.clone());
443 };
444
445 if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
446 let default_value = rows[0][default_pos + 9..].trim();
447 if !default_value.is_empty() {
448 comments.push(format!(
449 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
450 options.table_name, name, default_value
451 ));
452 }
453 }
454 let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
457 .as_str()
458 .unwrap_or("YES");
459 let old_is_required = old_is_nullable == "NO";
460
461 if old_is_required && name != options.table_key {
463 comments.push(format!(
464 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
465 options.table_name, name
466 ));
467 }
468
469 if rows.len() > 1 {
470 comments.push(format!(
471 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
472 options.table_name,
473 name,
474 rows[1].trim()
475 ));
476 }
477 }
478
479 let mut unique_new = vec![];
480 let mut index_new = vec![];
481 let mut primary_key = vec![];
482 let (_, index_list) = self.query(
483 format!(
484 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
485 options.table_name
486 )
487 .as_str(),
488 );
489 for item in index_list.members() {
490 let key_name = item["indexname"].as_str().unwrap_or("");
491 let indexdef = item["indexdef"].to_string();
492
493 if indexdef.contains(
494 format!(
495 "CREATE UNIQUE INDEX {}_{} ON",
496 options.table_name, options.table_key
497 )
498 .as_str(),
499 ) {
500 primary_key.push(key_name.to_string());
501 continue;
502 }
503 if indexdef.contains("CREATE UNIQUE INDEX") {
504 unique_new.push(key_name.to_string());
505 continue;
506 }
507 if indexdef.contains("CREATE INDEX") {
508 index_new.push(key_name.to_string());
509 continue;
510 }
511 }
512
513 if !options.table_unique.is_empty() {
514 let full_name = format!(
515 "{}_unique_{}",
516 options.table_name,
517 options.table_unique.join("_")
518 );
519 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
520 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
521 let unique = format!(
522 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
523 name,
524 options.table_name,
525 options.table_unique.join(",")
526 );
527 if !unique_new.contains(&name) {
528 comments.push(unique);
529 }
530 unique_new.retain(|x| *x != name);
531 }
532
533 for row in options.table_index.iter() {
534 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
535 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
536 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
537 let index = format!(
538 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
539 name,
540 options.table_name,
541 row.join(",")
542 );
543 if !index_new.contains(&name) {
544 comments.push(index);
545 }
546 index_new.retain(|x| *x != name);
547 }
548
549 for item in unique_new {
550 if item.ends_with("_pkey") {
551 continue;
552 }
553 if item.starts_with("unique_") {
554 comments.push(format!(
555 "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
556 options.table_name,
557 item.clone()
558 ));
559 } else {
560 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
561 }
562 }
563 for item in index_new {
564 if item.ends_with("_pkey") {
565 continue;
566 }
567 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
568 }
569
570 if self.params.sql {
571 return JsonValue::from(comments.join(""));
572 }
573
574 if comments.is_empty() {
575 return JsonValue::from(-1);
576 }
577
578 for item in comments.iter() {
579 let (state, res) = self.execute(item.as_str());
580 match state {
581 true => {}
582 false => {
583 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
584 return JsonValue::from(0);
585 }
586 }
587 }
588 JsonValue::from(1)
589 }
590
591 fn table_info(&mut self, table: &str) -> JsonValue {
592 let sql = format!(
593 "SELECT COL.COLUMN_NAME,
594 COL.DATA_TYPE,
595 COL.IS_NULLABLE,
596 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
597 LEFT JOIN
598 pg_catalog.pg_description DESCRIPTION
599 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
600 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
601 let (state, data) = self.query(sql.as_str());
602 let mut list = object! {};
603 if state {
604 for item in data.members() {
605 let mut row = object! {};
606 row["field"] = item["column_name"].clone();
607 row["comment"] = item["comment"].clone();
608 row["type"] = item["data_type"].clone();
609 row["is_nullable"] = item["is_nullable"].clone();
610 if let Some(field_name) = row["field"].as_str() {
611 list[field_name] = row.clone();
612 }
613 }
614 list
615 } else {
616 list
617 }
618 }
619
620 fn table_is_exist(&mut self, name: &str) -> bool {
621 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
622 let (state, data) = self.query(sql.as_str());
623 match state {
624 true => {
625 for item in data.members() {
626 if item.has_key("exists") {
627 return item["exists"].as_bool().unwrap_or(false);
628 }
629 }
630 false
631 }
632 false => false,
633 }
634 }
635
636 fn table(&mut self, name: &str) -> &mut Pgsql {
637 self.params = Params::default(self.connection.mode.str().as_str());
638 let table_name = format!("{}{}", self.connection.prefix, name);
639 if !super::sql_safety::validate_table_name(&table_name) {
640 error!("Invalid table name: {}", name);
641 }
642 self.params.table = table_name.clone();
643 self.params.join_table = table_name;
644 self
645 }
646
647 fn change_table(&mut self, name: &str) -> &mut Self {
648 self.params.join_table = name.to_string();
649 self
650 }
651
652 fn autoinc(&mut self) -> &mut Self {
653 self.params.autoinc = true;
654 self
655 }
656
657 fn timestamps(&mut self) -> &mut Self {
658 self.params.timestamps = true;
659 self
660 }
661
662 fn fetch_sql(&mut self) -> &mut Self {
663 self.params.sql = true;
664 self
665 }
666
667 fn order(&mut self, field: &str, by: bool) -> &mut Self {
668 self.params.order[field] = {
669 if by {
670 "DESC"
671 } else {
672 "ASC"
673 }
674 }
675 .into();
676 self
677 }
678
679 fn group(&mut self, field: &str) -> &mut Self {
680 let fields: Vec<&str> = field.split(",").collect();
681 for field in fields.iter() {
682 let field = field.to_string();
683 self.params.group[field.as_str()] = field.clone().into();
684 self.params.fields[field.as_str()] = field.clone().into();
685 }
686 self
687 }
688
689 fn distinct(&mut self) -> &mut Self {
690 self.params.distinct = true;
691 self
692 }
693
694 fn json(&mut self, field: &str) -> &mut Self {
695 let list: Vec<&str> = field.split(",").collect();
696 for item in list.iter() {
697 self.params.json[item.to_string().as_str()] = item.to_string().into();
698 }
699 self
700 }
701
702 fn location(&mut self, field: &str) -> &mut Self {
703 let list: Vec<&str> = field.split(",").collect();
704 for item in list.iter() {
705 self.params.location[item.to_string().as_str()] = item.to_string().into();
706 }
707 self
708 }
709
710 fn field(&mut self, field: &str) -> &mut Self {
711 let list: Vec<&str> = field.split(",").collect();
712 let join_table = if self.params.join_table.is_empty() {
713 self.params.table.clone()
714 } else {
715 self.params.join_table.clone()
716 };
717 for item in list.iter() {
718 if item.contains(" as ") {
719 let text = item.split(" as ").collect::<Vec<&str>>();
720 if text[0].contains("count(") {
721 self.params.fields[item.to_string().as_str()] =
722 format!("{} as {}", text[0], text[1]).into();
723 } else {
724 self.params.fields[item.to_string().as_str()] =
725 format!("{}.{} as {}", join_table, text[0], text[1]).into();
726 }
727 } else {
728 self.params.fields[item.to_string().as_str()] =
729 format!("{join_table}.{item}").into();
730 }
731 }
732 self
733 }
734
735 fn field_raw(&mut self, expr: &str) -> &mut Self {
736 self.params.fields[expr] = expr.into();
737 self
738 }
739
740 fn hidden(&mut self, name: &str) -> &mut Self {
741 let hidden: Vec<&str> = name.split(",").collect();
742
743 let fields_list = self.table_info(self.params.clone().table.as_str());
744 let mut data = array![];
745 for item in fields_list.members() {
746 let _ = data.push(object! {
747 "name":item["field"].as_str().unwrap_or("")
748 });
749 }
750
751 for item in data.members() {
752 let name = item["name"].as_str().unwrap_or("");
753 if !hidden.contains(&name) {
754 self.params.fields[name] = name.into();
755 }
756 }
757 self
758 }
759
760 fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
761 for f in field.split('|') {
762 if !super::sql_safety::validate_field_name(f) {
763 error!("Invalid field name: {}", f);
764 }
765 }
766 if !super::sql_safety::validate_compare_orator(compare) {
767 error!("Invalid compare operator: {}", compare);
768 }
769 let join_table = if self.params.join_table.is_empty() {
770 self.params.table.clone()
771 } else {
772 self.params.join_table.clone()
773 };
774 if value.is_boolean() {
775 let bool_val = value.as_bool().unwrap_or(false);
776 self.params
777 .where_and
778 .push(format!("{join_table}.{field} {compare} {bool_val}"));
779 return self;
780 }
781 match compare {
782 "between" => {
783 self.params.where_and.push(format!(
784 "{}.{} between '{}' AND '{}'",
785 join_table, field, value[0], value[1]
786 ));
787 }
788 "set" => {
789 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
790 let mut wheredata = vec![];
791 for item in list.iter() {
792 wheredata.push(format!(
793 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
794 ));
795 }
796 self.params
797 .where_and
798 .push(format!("({})", wheredata.join(" or ")));
799 }
800 "notin" => {
801 let mut text = String::new();
802 for item in value.members() {
803 text = format!("{text},'{item}'");
804 }
805 text = text.trim_start_matches(",").into();
806 self.params
807 .where_and
808 .push(format!("{join_table}.{field} not in ({text})"));
809 }
810 "is" => {
811 self.params
812 .where_and
813 .push(format!("{join_table}.{field} is {value}"));
814 }
815 "isnot" => {
816 self.params
817 .where_and
818 .push(format!("{join_table}.{field} is not {value}"));
819 }
820 "notlike" => {
821 self.params
822 .where_and
823 .push(format!("{join_table}.{field} not like '{value}'"));
824 }
825 "in" => {
826 let mut text = String::new();
827 if value.is_array() {
828 for item in value.members() {
829 text = format!("{text},'{item}'");
830 }
831 } else if value.is_null() {
832 text = format!("{text},null");
833 } else {
834 let value = value.as_str().unwrap_or("");
835
836 let value: Vec<&str> = value.split(",").collect();
837 for item in value.iter() {
838 text = format!("{text},'{item}'");
839 }
840 }
841 text = text.trim_start_matches(",").into();
842
843 self.params
844 .where_and
845 .push(format!("{join_table}.{field} {compare} ({text})"));
846 }
847 _ => {
848 self.params
849 .where_and
850 .push(format!("{join_table}.{field} {compare} '{value}'"));
851 }
852 }
853 self
854 }
855
856 fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
857 for f in field.split('|') {
858 if !super::sql_safety::validate_field_name(f) {
859 error!("Invalid field name: {}", f);
860 }
861 }
862 if !super::sql_safety::validate_compare_orator(compare) {
863 error!("Invalid compare operator: {}", compare);
864 }
865 let join_table = if self.params.join_table.is_empty() {
866 self.params.table.clone()
867 } else {
868 self.params.join_table.clone()
869 };
870
871 if value.is_boolean() {
872 let bool_val = value.as_bool().unwrap_or(false);
873 self.params
874 .where_or
875 .push(format!("{join_table}.{field} {compare} {bool_val}"));
876 return self;
877 }
878
879 match compare {
880 "between" => {
881 self.params.where_or.push(format!(
882 "{}.{} between '{}' AND '{}'",
883 join_table, field, value[0], value[1]
884 ));
885 }
886 "set" => {
887 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
888 let mut wheredata = vec![];
889 for item in list.iter() {
890 wheredata.push(format!(
891 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
892 ));
893 }
894 self.params
895 .where_or
896 .push(format!("({})", wheredata.join(" or ")));
897 }
898 "notin" => {
899 let mut text = String::new();
900 for item in value.members() {
901 text = format!("{text},'{item}'");
902 }
903 text = text.trim_start_matches(",").into();
904 self.params
905 .where_or
906 .push(format!("{join_table}.{field} not in ({text})"));
907 }
908 "is" => {
909 self.params
910 .where_or
911 .push(format!("{join_table}.{field} is {value}"));
912 }
913 "isnot" => {
914 self.params
915 .where_or
916 .push(format!("{join_table}.{field} is not {value}"));
917 }
918 "in" => {
919 let mut text = String::new();
920 if value.is_array() {
921 for item in value.members() {
922 text = format!("{text},'{item}'");
923 }
924 } else {
925 let value = value.as_str().unwrap_or("");
926 let value: Vec<&str> = value.split(",").collect();
927 for item in value.iter() {
928 text = format!("{text},'{item}'");
929 }
930 }
931 text = text.trim_start_matches(",").into();
932 self.params
933 .where_or
934 .push(format!("{join_table}.{field} {compare} ({text})"));
935 }
936 _ => {
937 self.params
938 .where_or
939 .push(format!("{join_table}.{field} {compare} '{value}'"));
940 }
941 }
942 self
943 }
944
945 fn where_raw(&mut self, expr: &str) -> &mut Self {
946 self.params.where_and.push(expr.to_string());
947 self
948 }
949
950 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
951 self.params
952 .where_and
953 .push(format!("\"{field}\" IN ({sub_sql})"));
954 self
955 }
956
957 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
958 self.params
959 .where_and
960 .push(format!("\"{field}\" NOT IN ({sub_sql})"));
961 self
962 }
963
964 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
965 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
966 self
967 }
968
969 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
970 self.params
971 .where_and
972 .push(format!("NOT EXISTS ({sub_sql})"));
973 self
974 }
975
976 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
977 self.params.where_column = format!(
978 "{}.{} {} {}.{}",
979 self.params.table, field_a, compare, self.params.table, field_b
980 );
981 self
982 }
983
984 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
985 self.params
986 .update_column
987 .push(format!("{field_a} = {compare}"));
988 self
989 }
990
991 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
992 self.params.page = page;
993 self.params.limit = limit;
994 self
995 }
996
997 fn limit(&mut self, count: i32) -> &mut Self {
998 self.params.limit_only = count;
999 self
1000 }
1001
1002 fn column(&mut self, field: &str) -> JsonValue {
1003 self.field(field);
1004 let sql = self.params.select_sql();
1005
1006 if self.params.sql {
1007 return JsonValue::from(sql);
1008 }
1009 let (state, data) = self.query(sql.as_str());
1010 match state {
1011 true => {
1012 let mut list = array![];
1013 for item in data.members() {
1014 if self.params.json[field].is_empty() {
1015 let _ = list.push(item[field].clone());
1016 } else {
1017 let data =
1018 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1019 let _ = list.push(data);
1020 }
1021 }
1022 list
1023 }
1024 false => {
1025 array![]
1026 }
1027 }
1028 }
1029
1030 fn count(&mut self) -> JsonValue {
1031 self.params.fields = json::object! {};
1032 self.params.fields["count"] = "count(*) as count".to_string().into();
1033 let sql = self.params.select_sql();
1034 if self.params.sql {
1035 return JsonValue::from(sql.clone());
1036 }
1037 let (state, data) = self.query(sql.as_str());
1038 if state {
1039 data[0]["count"].clone()
1040 } else {
1041 JsonValue::from(0)
1042 }
1043 }
1044
1045 fn max(&mut self, field: &str) -> JsonValue {
1046 self.params.fields[field] = format!("max({field}) as {field}").into();
1047 let sql = self.params.select_sql();
1048 if self.params.sql {
1049 return JsonValue::from(sql.clone());
1050 }
1051 let (state, data) = self.query(sql.as_str());
1052 if state {
1053 if data.len() > 1 {
1054 return data.clone();
1055 }
1056 data[0][field].clone()
1057 } else {
1058 JsonValue::from(0)
1059 }
1060 }
1061
1062 fn min(&mut self, field: &str) -> JsonValue {
1063 self.params.fields[field] = format!("min({field}) as {field}").into();
1064 let sql = self.params.select_sql();
1065 if self.params.sql {
1066 return JsonValue::from(sql.clone());
1067 }
1068 let (state, data) = self.query(sql.as_str());
1069 if state {
1070 if data.len() > 1 {
1071 return data;
1072 }
1073 data[0][field].clone()
1074 } else {
1075 JsonValue::from(0)
1076 }
1077 }
1078
1079 fn sum(&mut self, field: &str) -> JsonValue {
1080 self.params.fields[field] = format!("sum({field}) as {field}").into();
1081 let sql = self.params.select_sql();
1082 if self.params.sql {
1083 return JsonValue::from(sql.clone());
1084 }
1085 let (state, data) = self.query(sql.as_str());
1086 match state {
1087 true => {
1088 if data.len() > 1 {
1089 return data;
1090 }
1091 data[0][field].clone()
1092 }
1093 false => JsonValue::from(0),
1094 }
1095 }
1096
1097 fn avg(&mut self, field: &str) -> JsonValue {
1098 self.params.fields[field] = format!("avg({field}) as {field}").into();
1099 let sql = self.params.select_sql();
1100 if self.params.sql {
1101 return JsonValue::from(sql.clone());
1102 }
1103 let (state, data) = self.query(sql.as_str());
1104 if state {
1105 if data.len() > 1 {
1106 return data;
1107 }
1108 data[0][field].clone()
1109 } else {
1110 JsonValue::from(0)
1111 }
1112 }
1113
1114 fn having(&mut self, expr: &str) -> &mut Self {
1115 self.params.having.push(expr.to_string());
1116 self
1117 }
1118
1119 fn select(&mut self) -> JsonValue {
1120 let sql = self.params.select_sql();
1121 if self.params.sql {
1122 return JsonValue::from(sql.clone());
1123 }
1124 let (state, mut data) = self.query(sql.as_str());
1125 match state {
1126 true => {
1127 for (field, _) in self.params.json.entries() {
1128 for item in data.members_mut() {
1129 if !item[field].is_empty() {
1130 let json = item[field].to_string();
1131 item[field] = match json::parse(&json) {
1132 Ok(e) => e,
1133 Err(_) => JsonValue::from(json),
1134 };
1135 }
1136 }
1137 }
1138 data.clone()
1139 }
1140 false => array![],
1141 }
1142 }
1143
1144 fn find(&mut self) -> JsonValue {
1145 self.params.page = 1;
1146 self.params.limit = 1;
1147 let sql = self.params.select_sql();
1148 if self.params.sql {
1149 return JsonValue::from(sql.clone());
1150 }
1151 let (state, mut data) = self.query(sql.as_str());
1152 match state {
1153 true => {
1154 if data.is_empty() {
1155 return object! {};
1156 }
1157 for (field, _) in self.params.json.entries() {
1158 if !data[0][field].is_empty() {
1159 let json = data[0][field].to_string();
1160 let json = json::parse(&json).unwrap_or(array![]);
1161 data[0][field] = json;
1162 } else {
1163 data[0][field] = array![];
1164 }
1165 }
1166 data[0].clone()
1167 }
1168 false => {
1169 object! {}
1170 }
1171 }
1172 }
1173
1174 fn value(&mut self, field: &str) -> JsonValue {
1175 self.params.fields = object! {};
1176 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1177 self.params.page = 1;
1178 self.params.limit = 1;
1179 let sql = self.params.select_sql();
1180 if self.params.sql {
1181 return JsonValue::from(sql.clone());
1182 }
1183 let (state, mut data) = self.query(sql.as_str());
1184 match state {
1185 true => {
1186 for (field, _) in self.params.json.entries() {
1187 if !data[0][field].is_empty() {
1188 let json = data[0][field].to_string();
1189 let json = json::parse(&json).unwrap_or(array![]);
1190 data[0][field] = json;
1191 } else {
1192 data[0][field] = array![];
1193 }
1194 }
1195 data[0][field].clone()
1196 }
1197 false => {
1198 if self.connection.debug {
1199 info!("{data:?}");
1200 }
1201 JsonValue::Null
1202 }
1203 }
1204 }
1205
1206 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1207 let mut fields = vec![];
1208 let mut values = vec![];
1209 if !self.params.autoinc && data["id"].is_empty() {
1210 let thread_id = format!("{:?}", std::thread::current().id());
1211 let thread_num: u64 = thread_id
1212 .trim_start_matches("ThreadId(")
1213 .trim_end_matches(")")
1214 .parse()
1215 .unwrap_or(0);
1216 data["id"] = format!(
1217 "{:X}{:X}",
1218 Local::now().timestamp_nanos_opt().unwrap_or(0),
1219 thread_num
1220 )
1221 .into();
1222 }
1223 for (field, value) in data.entries() {
1224 fields.push(format!("\"{}\"", field));
1225
1226 if value.is_string() {
1227 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1228 continue;
1229 } else if value.is_array() {
1230 if self.params.json[field].is_empty() {
1231 let array = value
1232 .members()
1233 .map(|x| x.as_str().unwrap_or(""))
1234 .collect::<Vec<&str>>()
1235 .join(",");
1236 values.push(format!("'{array}'"));
1237 } else {
1238 let json = value.to_string();
1239 let json = json.replace("'", "''");
1240 values.push(format!("'{json}'"));
1241 }
1242 continue;
1243 } else if value.is_object() {
1244 if self.params.json[field].is_empty() {
1245 values.push(format!("'{value}'"));
1246 } else {
1247 let json = value.to_string();
1248 let json = json.replace("'", "''");
1249 values.push(format!("'{json}'"));
1250 }
1251 continue;
1252 } else if value.is_number() || value.is_boolean() || value.is_null() {
1253 values.push(format!("{value}"));
1254 continue;
1255 } else {
1256 values.push(format!("'{value}'"));
1257 continue;
1258 }
1259 }
1260 let fields = fields.join(",");
1261 let values = values.join(",");
1262
1263 let sql = format!(
1264 "INSERT INTO {} ({}) VALUES ({});",
1265 self.params.table, fields, values
1266 );
1267 if self.params.sql {
1268 return JsonValue::from(sql.clone());
1269 }
1270 let (state, ids) = self.execute(sql.as_str());
1271
1272 match state {
1273 true => match self.params.autoinc {
1274 true => ids.clone(),
1275 false => data["id"].clone(),
1276 },
1277 false => {
1278 let thread_id = format!("{:?}", thread::current().id());
1279 error!("添加失败: {thread_id} {ids:?} {sql}");
1280 JsonValue::from("")
1281 }
1282 }
1283 }
1284 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1285 let mut fields = String::new();
1286 if !self.params.autoinc && data[0]["id"].is_empty() {
1287 data[0]["id"] = "".into();
1288 }
1289 for (field, _) in data[0].entries() {
1290 fields = format!("{fields},\"{field}\"");
1291 }
1292 fields = fields.trim_start_matches(",").to_string();
1293
1294 let core_count = num_cpus::get();
1295 let mut p = pools::Pool::new(core_count * 4);
1296
1297 let autoinc = self.params.autoinc;
1298 for list in data.members() {
1299 let mut item = list.clone();
1300 let i = br_fields::str::Code::verification_code(3);
1301 p.execute(move |pcindex| {
1302 if !autoinc && item["id"].is_empty() {
1303 let id = format!(
1304 "{:X}{:X}{}",
1305 Local::now().timestamp_nanos_opt().unwrap_or(0),
1306 pcindex,
1307 i
1308 );
1309 item["id"] = id.into();
1310 }
1311 let mut values = "".to_string();
1312 for (_, value) in item.entries() {
1313 if value.is_string() {
1314 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1315 } else if value.is_number() {
1316 values = format!("{values},{value}");
1317 } else if value.is_boolean() {
1318 values = format!("{values},{value}");
1319 continue;
1320 } else {
1321 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1322 }
1323 }
1324 values = format!("({})", values.trim_start_matches(","));
1325 array![item["id"].clone(), values]
1326 });
1327 }
1328 let (ids_list, mut values) = p.insert_all();
1329 values = values.trim_start_matches(",").to_string();
1330 let sql = format!(
1331 "INSERT INTO {} ({}) VALUES {};",
1332 self.params.table, fields, values
1333 );
1334
1335 if self.params.sql {
1336 return JsonValue::from(sql.clone());
1337 }
1338 let (state, data) = self.execute(sql.as_str());
1339 match state {
1340 true => match autoinc {
1341 true => data,
1342 false => JsonValue::from(ids_list),
1343 },
1344 false => {
1345 error!("insert_all: {data:?}");
1346 array![]
1347 }
1348 }
1349 }
1350 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1351 let mut fields = vec![];
1352 let mut values = vec![];
1353 if !self.params.autoinc && data["id"].is_empty() {
1354 let thread_id = format!("{:?}", std::thread::current().id());
1355 let thread_num: u64 = thread_id
1356 .trim_start_matches("ThreadId(")
1357 .trim_end_matches(")")
1358 .parse()
1359 .unwrap_or(0);
1360 data["id"] = format!(
1361 "{:X}{:X}",
1362 Local::now().timestamp_nanos_opt().unwrap_or(0),
1363 thread_num
1364 )
1365 .into();
1366 }
1367 for (field, value) in data.entries() {
1368 fields.push(format!("\"{}\"", field));
1369
1370 if value.is_string() {
1371 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1372 continue;
1373 } else if value.is_array() {
1374 if self.params.json[field].is_empty() {
1375 let array = value
1376 .members()
1377 .map(|x| x.as_str().unwrap_or(""))
1378 .collect::<Vec<&str>>()
1379 .join(",");
1380 values.push(format!("'{array}'"));
1381 } else {
1382 let json = value.to_string();
1383 let json = json.replace("'", "''");
1384 values.push(format!("'{json}'"));
1385 }
1386 continue;
1387 } else if value.is_object() {
1388 if self.params.json[field].is_empty() {
1389 values.push(format!("'{value}'"));
1390 } else {
1391 let json = value.to_string();
1392 let json = json.replace("'", "''");
1393 values.push(format!("'{json}'"));
1394 }
1395 continue;
1396 } else if value.is_number() || value.is_boolean() || value.is_null() {
1397 values.push(format!("{value}"));
1398 continue;
1399 } else {
1400 values.push(format!("'{value}'"));
1401 continue;
1402 }
1403 }
1404
1405 let conflict_cols: Vec<String> = conflict_fields
1406 .iter()
1407 .map(|f| format!("\"{}\"", f))
1408 .collect();
1409
1410 let update_set: Vec<String> = fields
1411 .iter()
1412 .filter(|f| {
1413 let name = f.trim_matches('"');
1414 !conflict_fields.contains(&name) && name != "id"
1415 })
1416 .map(|f| format!("{f}=EXCLUDED.{f}"))
1417 .collect();
1418
1419 let fields_str = fields.join(",");
1420 let values_str = values.join(",");
1421
1422 let sql = format!(
1423 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1424 self.params.table,
1425 fields_str,
1426 values_str,
1427 conflict_cols.join(","),
1428 update_set.join(",")
1429 );
1430 if self.params.sql {
1431 return JsonValue::from(sql.clone());
1432 }
1433 let (state, result) = self.execute(sql.as_str());
1434 match state {
1435 true => match self.params.autoinc {
1436 true => result.clone(),
1437 false => data["id"].clone(),
1438 },
1439 false => {
1440 let thread_id = format!("{:?}", thread::current().id());
1441 error!("upsert失败: {thread_id} {result:?} {sql}");
1442 JsonValue::from("")
1443 }
1444 }
1445 }
1446 fn update(&mut self, data: JsonValue) -> JsonValue {
1447 let mut values = vec![];
1448 for (field, value) in data.entries() {
1449 if value.is_string() {
1450 values.push(format!(
1451 "\"{}\"='{}'",
1452 field,
1453 value.to_string().replace("'", "''")
1454 ));
1455 } else if value.is_number() {
1456 values.push(format!("\"{field}\"= {value}"));
1457 } else if value.is_array() {
1458 if self.params.json[field].is_empty() {
1459 let array = value
1460 .members()
1461 .map(|x| x.as_str().unwrap_or(""))
1462 .collect::<Vec<&str>>()
1463 .join(",");
1464 values.push(format!("\"{field}\"='{array}'"));
1465 } else {
1466 let json = value.to_string();
1467 let json = json.replace("'", "''");
1468 values.push(format!("\"{field}\"='{json}'"));
1469 }
1470 continue;
1471 } else if value.is_object() {
1472 if self.params.json[field].is_empty() {
1473 values.push(format!("\"{field}\"='{value}'"));
1474 } else {
1475 if value.is_empty() {
1476 values.push(format!("\"{field}\"=''"));
1477 continue;
1478 }
1479 let json = value.to_string();
1480 let json = json.replace("'", "''");
1481 values.push(format!("\"{field}\"='{json}'"));
1482 }
1483 continue;
1484 } else if value.is_boolean() {
1485 values.push(format!("\"{field}\"= {value}"));
1486 } else {
1487 values.push(format!("\"{field}\"=\"{value}\""));
1488 }
1489 }
1490
1491 for (field, value) in self.params.inc_dec.entries() {
1492 values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1493 }
1494 if !self.params.update_column.is_empty() {
1495 values.extend(self.params.update_column.clone());
1496 }
1497 let values = values.join(",");
1498
1499 let sql = format!(
1500 "UPDATE {} SET {} {};",
1501 self.params.table.clone(),
1502 values,
1503 self.params.where_sql()
1504 );
1505 if self.params.sql {
1506 return JsonValue::from(sql.clone());
1507 }
1508 let (state, data) = self.execute(sql.as_str());
1509 if state {
1510 data
1511 } else {
1512 let thread_id = format!("{:?}", thread::current().id());
1513 error!("update: {thread_id} {data:?} {sql}");
1514 0.into()
1515 }
1516 }
1517 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1518 let mut values = vec![];
1519
1520 let mut ids = vec![];
1521 for (field, _) in data[0].entries() {
1522 if field == "id" {
1523 continue;
1524 }
1525 let mut fields = vec![];
1526 for row in data.members() {
1527 let value = row[field].clone();
1528 let id = row["id"].clone();
1529 ids.push(id.clone());
1530 if value.is_string() {
1531 fields.push(format!(
1532 "WHEN '{}' THEN '{}'",
1533 id,
1534 value.to_string().replace("'", "''")
1535 ));
1536 } else if value.is_array() || value.is_object() {
1537 if self.params.json[field].is_empty() {
1538 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1539 } else {
1540 let json = value.to_string();
1541 let json = json.replace("'", "''");
1542 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1543 }
1544 continue;
1545 } else if value.is_number() || value.is_boolean() || value.is_null() {
1546 fields.push(format!("WHEN '{id}' THEN {value}"));
1547 } else {
1548 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1549 }
1550 }
1551 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1552 }
1553 self.where_and("id", "in", ids.into());
1554 for (field, value) in self.params.inc_dec.entries() {
1555 values.push(format!("{} = {}", field, value.to_string().clone()));
1556 }
1557
1558 let values = values.join(",");
1559 let sql = format!(
1560 "UPDATE {} SET {} {} {};",
1561 self.params.table.clone(),
1562 values,
1563 self.params.where_sql(),
1564 self.params.page_limit_sql()
1565 );
1566 if self.params.sql {
1567 return JsonValue::from(sql.clone());
1568 }
1569 let (state, data) = self.execute(sql.as_str());
1570 if state {
1571 data
1572 } else {
1573 error!("update_all: {data:?}");
1574 JsonValue::from(0)
1575 }
1576 }
1577
1578 fn delete(&mut self) -> JsonValue {
1579 let sql = format!(
1580 "delete FROM {} {} {};",
1581 self.params.table.clone(),
1582 self.params.where_sql(),
1583 self.params.page_limit_sql()
1584 );
1585 if self.params.sql {
1586 return JsonValue::from(sql.clone());
1587 }
1588 let (state, data) = self.execute(sql.as_str());
1589 match state {
1590 true => data,
1591 false => {
1592 error!("delete 失败>>> {data:?}");
1593 JsonValue::from(0)
1594 }
1595 }
1596 }
1597
1598 fn transaction(&mut self) -> bool {
1599 let thread_id = format!("{:?}", thread::current().id());
1600 let key = format!("{}{}", self.default, thread_id);
1601
1602 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1603 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1604 PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1605 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1606 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1607 return true;
1608 }
1609
1610 let mut conn = match self.client.get_connect_for_transaction() {
1611 Ok(c) => c,
1612 Err(e) => {
1613 error!("获取事务连接失败: {e}");
1614 return false;
1615 }
1616 };
1617
1618 if !conn.is_valid() {
1619 error!("事务连接无效");
1620 self.client.release_transaction_conn();
1621 return false;
1622 }
1623
1624 if let Err(e) = conn.execute("START TRANSACTION") {
1625 error!("启动事务失败: {e}");
1626 self.client.release_transaction_conn();
1627 return false;
1628 }
1629
1630 if let Err(e) = conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") {
1631 error!("设置事务隔离级别失败: {e}");
1632 let _ = conn.execute("ROLLBACK");
1633 self.client.release_transaction_conn();
1634 return false;
1635 }
1636
1637 PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1638 true
1639 }
1640 fn commit(&mut self) -> bool {
1641 let thread_id = format!("{:?}", thread::current().id());
1642 let key = format!("{}{}", self.default, thread_id);
1643
1644 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1645 error!("commit: 没有活跃的事务");
1646 return false;
1647 }
1648
1649 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1650 if depth > 1 {
1651 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1652 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1653 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1654 return true;
1655 }
1656
1657 let commit_result =
1658 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1659
1660 let success = match commit_result {
1661 Some(Ok(_)) => true,
1662 Some(Err(e)) => {
1663 error!("提交事务失败: {e}");
1664 false
1665 }
1666 None => {
1667 error!("提交事务失败: 未找到连接");
1668 false
1669 }
1670 };
1671
1672 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1673 self.client.release_transaction_conn();
1674 success
1675 }
1676
1677 fn rollback(&mut self) -> bool {
1678 let thread_id = format!("{:?}", thread::current().id());
1679 let key = format!("{}{}", self.default, thread_id);
1680
1681 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1682 error!("rollback: 没有活跃的事务");
1683 return false;
1684 }
1685
1686 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1687 if depth > 1 {
1688 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1689 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1690 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1691 return true;
1692 }
1693
1694 let rollback_result =
1695 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1696
1697 let success = match rollback_result {
1698 Some(Ok(_)) => true,
1699 Some(Err(e)) => {
1700 error!("回滚失败: {e}");
1701 false
1702 }
1703 None => {
1704 error!("回滚失败: 未找到连接");
1705 false
1706 }
1707 };
1708
1709 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1710 self.client.release_transaction_conn();
1711 success
1712 }
1713
1714 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1715 let (state, data) = self.query(sql);
1716 match state {
1717 true => Ok(data),
1718 false => Err(data.to_string()),
1719 }
1720 }
1721
1722 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1723 let (state, data) = self.execute(sql);
1724 match state {
1725 true => Ok(data),
1726 false => Err(data.to_string()),
1727 }
1728 }
1729
1730 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1731 self.params.inc_dec[field] = format!("{field} + {num}").into();
1732 self
1733 }
1734
1735 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1736 self.params.inc_dec[field] = format!("{field} - {num}").into();
1737 self
1738 }
1739 fn buildsql(&mut self) -> String {
1740 self.fetch_sql();
1741 let sql = self.select().to_string();
1742 format!("( {} ) {}", sql, self.params.table)
1743 }
1744
1745 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1746 for field in fields {
1747 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1748 }
1749 self
1750 }
1751
1752 fn join(
1753 &mut self,
1754 main_table: &str,
1755 main_fields: &str,
1756 right_table: &str,
1757 right_fields: &str,
1758 ) -> &mut Self {
1759 let main_table = if main_table.is_empty() {
1760 self.params.table.clone()
1761 } else {
1762 main_table.to_string()
1763 };
1764 self.params.join_table = right_table.to_string();
1765 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1766 self
1767 }
1768
1769 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1770 let main_fields = if main_fields.is_empty() {
1771 "id"
1772 } else {
1773 main_fields
1774 };
1775 let second_fields = if second_fields.is_empty() {
1776 self.params.table.clone()
1777 } else {
1778 second_fields.to_string().clone()
1779 };
1780 let sec_table_name = format!("{}{}", table, "_2");
1781 let second_table = format!("{} {}", table, sec_table_name.clone());
1782 self.params.join_table = sec_table_name.clone();
1783 self.params.join.push(format!(
1784 " INNER JOIN {} ON {}.{} = {}.{}",
1785 second_table, self.params.table, main_fields, sec_table_name, second_fields
1786 ));
1787 self
1788 }
1789
1790 fn join_right(
1791 &mut self,
1792 main_table: &str,
1793 main_fields: &str,
1794 right_table: &str,
1795 right_fields: &str,
1796 ) -> &mut Self {
1797 let main_table = if main_table.is_empty() {
1798 self.params.table.clone()
1799 } else {
1800 main_table.to_string()
1801 };
1802 self.params.join_table = right_table.to_string();
1803 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1804 self
1805 }
1806
1807 fn join_full(
1808 &mut self,
1809 main_table: &str,
1810 main_fields: &str,
1811 right_table: &str,
1812 right_fields: &str,
1813 ) -> &mut Self {
1814 let main_table = if main_table.is_empty() {
1815 self.params.table.clone()
1816 } else {
1817 main_table.to_string()
1818 };
1819 self.params.join_table = right_table.to_string();
1820 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1821 self
1822 }
1823
1824 fn union(&mut self, sub_sql: &str) -> &mut Self {
1825 self.params.unions.push(format!("UNION {sub_sql}"));
1826 self
1827 }
1828
1829 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1830 self.params.unions.push(format!("UNION ALL {sub_sql}"));
1831 self
1832 }
1833
1834 fn lock_for_update(&mut self) -> &mut Self {
1835 self.params.lock_mode = "FOR UPDATE".to_string();
1836 self
1837 }
1838
1839 fn lock_for_share(&mut self) -> &mut Self {
1840 self.params.lock_mode = "FOR SHARE".to_string();
1841 self
1842 }
1843}