1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use chrono::Local;
7use json::{array, object, JsonValue};
8use log::{error, info};
9use std::thread;
10#[derive(Clone)]
11pub struct Pgsql {
12 pub connection: Connection,
14 pub default: String,
16 pub params: Params,
17 pub client: Pools,
18}
19
20impl Pgsql {
21 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
22 let port = connection
23 .hostport
24 .parse::<i32>()
25 .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
26
27 let cp_connection = connection.clone();
28 let config = object! {
29 debug: cp_connection.debug,
30 username: cp_connection.username,
31 userpass: cp_connection.userpass,
32 database: cp_connection.database,
33 hostname: cp_connection.hostname,
34 hostport: port,
35 charset: cp_connection.charset.str(),
36 };
37 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
38
39 let pools = pgsql.pools()?;
40 Ok(Self {
41 connection,
42 default: default.clone(),
43 params: Params::default("pgsql"),
44 client: pools,
45 })
46 }
47
48 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
49 let thread_id = format!("{:?}", thread::current().id());
50 let key = format!("{}{}", self.default, thread_id);
51
52 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
53 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
54
55 match result {
56 Some(Ok(e)) => {
57 if self.connection.debug {
58 info!("查询成功: {} {}", thread_id.clone(), sql);
59 }
60 (true, e.rows)
61 }
62 Some(Err(e)) => {
63 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
64 (false, JsonValue::from(e.to_string()))
65 }
66 None => {
67 error!("事务查询失败: 未找到事务连接 {thread_id}");
68 (false, JsonValue::from("未找到事务连接"))
69 }
70 }
71 } else {
72 let mut guard = match self.client.get_guard() {
73 Ok(g) => g,
74 Err(e) => {
75 error!(
76 "非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
77 );
78 return (false, JsonValue::from(e.to_string()));
79 }
80 };
81
82 let res = guard.conn().query(sql);
83 match res {
84 Ok(e) => {
85 if self.connection.debug {
86 info!("查询成功: {} {}", thread_id.clone(), sql);
87 }
88 (true, e.rows)
89 }
90 Err(e) => {
91 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
92 (false, JsonValue::from(e.to_string()))
93 }
94 }
95 }
96 }
97 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
98 let thread_id = format!("{:?}", thread::current().id());
99 let key = format!("{}{}", self.default, thread_id);
100
101 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
102 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
103
104 match result {
105 Some(Ok(e)) => {
106 if self.connection.debug {
107 info!("提交成功: {} {}", thread_id.clone(), sql);
108 }
109 if sql.contains("INSERT") {
110 (true, e.rows)
111 } else {
112 (true, e.affect_count.into())
113 }
114 }
115 Some(Err(e)) => {
116 error!("事务提交失败: {thread_id} {e} {sql}");
117 (false, JsonValue::from(e.to_string()))
118 }
119 None => {
120 error!("事务执行失败: 未找到事务连接 {thread_id}");
121 (false, JsonValue::from("未找到事务连接"))
122 }
123 }
124 } else {
125 let mut guard = match self.client.get_guard() {
126 Ok(g) => g,
127 Err(e) => {
128 error!(
129 "非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
130 );
131 return (false, JsonValue::from(e.to_string()));
132 }
133 };
134
135 let res = guard.conn().execute(sql);
136 match res {
137 Ok(e) => {
138 if self.connection.debug {
139 info!("提交成功: {} {}", thread_id.clone(), sql);
140 }
141 if sql.contains("INSERT") {
142 (true, e.rows)
143 } else {
144 (true, e.affect_count.into())
145 }
146 }
147 Err(e) => {
148 error!("非事务提交失败: {thread_id} {e} {sql}");
149 (false, JsonValue::from(e.to_string()))
150 }
151 }
152 }
153 }
154}
155
156impl DbMode for Pgsql {
157 fn database_tables(&mut self) -> JsonValue {
158 let sql = "SHOW TABLES".to_string();
159 match self.sql(sql.as_str()) {
160 Ok(e) => {
161 let mut list = vec![];
162 for item in e.members() {
163 for (_, value) in item.entries() {
164 list.push(value.clone());
165 }
166 }
167 list.into()
168 }
169 Err(_) => {
170 array![]
171 }
172 }
173 }
174
175 fn database_create(&mut self, name: &str) -> bool {
176 let sql = format!("CREATE DATABASE {name}");
177
178 let (state, data) = self.execute(sql.as_str());
179 match state {
180 true => data.as_bool().unwrap_or(true),
181 false => {
182 error!("创建数据库失败: {data:?}");
183 false
184 }
185 }
186 }
187
188 fn truncate(&mut self, table: &str) -> bool {
189 let sql = format!("TRUNCATE TABLE {table}");
190 let (state, _) = self.execute(sql.as_str());
191 state
192 }
193}
194
195impl Mode for Pgsql {
196 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
197 let mut sql = String::new();
198 let mut comments = vec![];
199
200 if !options.table_unique.is_empty() {
201 let full_name = format!(
202 "{}_unique_{}",
203 options.table_name,
204 options.table_unique.join("_")
205 );
206 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
207 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
208 let unique = format!(
209 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
210 name,
211 options.table_name,
212 options.table_unique.join(",")
213 );
214 comments.push(unique);
215 }
216
217 for row in options.table_index.iter() {
218 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
219 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
220 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
221 let index = format!(
222 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
223 name,
224 options.table_name,
225 row.join(",")
226 );
227 comments.push(index);
228 }
229
230 for (name, field) in options.table_fields.entries_mut() {
231 field["table_name"] = options.table_name.clone().into();
232 let row = br_fields::field("pgsql", name, field.clone());
233 let rows = row.split("comment").collect::<Vec<&str>>();
234 if rows.len() > 1 {
235 comments.push(format!(
236 "COMMENT ON COLUMN {}.\"{}\" IS {};",
237 options.table_name, name, rows[1]
238 ));
239 }
240 sql = format!("{} {},\r\n", sql, rows[0]);
241 }
242
243 let primary_key = format!(
244 "CONSTRAINT {}_{} PRIMARY KEY ({})",
245 options.table_name, options.table_key, options.table_key
246 );
247 let sql = format!(
248 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
249 options.table_name,
250 sql.trim_end_matches(",\r\n"),
251 primary_key
252 );
253 comments.insert(0, sql);
254
255 for (_name, field) in options.table_fields.entries() {
256 let _ = field["mode"].as_str();
257 }
258
259 if self.params.sql {
260 let info = comments.join("\r\n");
261 return JsonValue::from(info);
262 }
263 for comment in comments {
264 let (state, _) = self.execute(comment.as_str());
265 match state {
266 true => {}
267 false => {
268 return JsonValue::from(state);
269 }
270 }
271 }
272 JsonValue::from(true)
273 }
274
275 fn table_update(&mut self, options: TableOptions) -> JsonValue {
276 let fields_list = self.table_info(&options.table_name);
277 let mut put = vec![];
278 let mut add = vec![];
279 let mut del = vec![];
280 let mut comments = vec![];
281
282 for (key, _) in fields_list.entries() {
283 if options.table_fields[key].is_empty() {
284 del.push(key);
285 }
286 }
287 for (name, field) in options.table_fields.entries() {
288 if !fields_list[name].is_empty() {
289 let old_info = &fields_list[name];
290 let new_field_sql = br_fields::field("pgsql", name, field.clone());
291
292 let old_comment = old_info["comment"].as_str().unwrap_or("");
293 let old_type = old_info["type"].as_str().unwrap_or("");
294
295 let new_comment = if let Some(idx) = new_field_sql.find("--") {
296 new_field_sql[idx + 2..].trim()
297 } else {
298 ""
299 };
300
301 let comment_matches =
302 if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
303 let old_parts: Vec<&str> = old_comment.split('|').collect();
304 let new_parts: Vec<&str> = new_comment.split('|').collect();
305 if old_parts.len() >= 4 && new_parts.len() >= 4 {
306 old_parts[..4] == new_parts[..4]
307 } else {
308 old_comment == new_comment
309 }
310 } else if !old_comment.is_empty() && !new_comment.is_empty() {
311 let old_parts: Vec<&str> = old_comment.split('|').collect();
312 let new_parts: Vec<&str> = new_comment.split('|').collect();
313 if old_parts.len() >= 2
314 && new_parts.len() >= 2
315 && old_parts.len() == new_parts.len()
316 {
317 let old_filtered: Vec<&str> = old_parts
318 .iter()
319 .filter(|v| **v != "true" && **v != "false")
320 .copied()
321 .collect();
322 let new_filtered: Vec<&str> = new_parts
323 .iter()
324 .filter(|v| **v != "true" && **v != "false")
325 .copied()
326 .collect();
327 old_filtered == new_filtered
328 } else {
329 old_comment == new_comment
330 }
331 } else {
332 old_comment == new_comment
333 };
334
335 let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
336 let new_type = if sql_parts.len() > 1 {
337 sql_parts[1].to_lowercase()
338 } else {
339 String::new()
340 };
341
342 let type_matches = match old_type {
343 "integer" => {
344 new_type.contains("int")
345 && !new_type.contains("bigint")
346 && !new_type.contains("smallint")
347 }
348 "bigint" => new_type.contains("bigint"),
349 "smallint" => new_type.contains("smallint"),
350 "boolean" => new_type.contains("boolean"),
351 "text" => new_type.contains("text"),
352 "character varying" => new_type.contains("varchar"),
353 "character" => new_type.contains("char") && !new_type.contains("varchar"),
354 "numeric" => new_type.contains("numeric") || new_type.contains("decimal"),
355 "double precision" => {
356 new_type.contains("double") || new_type.contains("float8")
357 }
358 "real" => new_type.contains("real") || new_type.contains("float4"),
359 "timestamp without time zone" | "timestamp with time zone" => {
360 new_type.contains("timestamp")
361 }
362 "date" => new_type.contains("date") && !new_type.contains("timestamp"),
363 "time without time zone" | "time with time zone" => {
364 new_type.contains("time") && !new_type.contains("timestamp")
365 }
366 "json" | "jsonb" => new_type.contains("json"),
367 "uuid" => new_type.contains("uuid"),
368 "bytea" => new_type.contains("bytea"),
369 _ => old_type == new_type,
370 };
371
372 if type_matches && comment_matches {
373 continue;
374 }
375
376 log::debug!(
377 "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
378 options.table_name,
379 name,
380 type_matches,
381 old_type,
382 new_type,
383 comment_matches
384 );
385 put.push(name);
386 } else {
387 add.push(name);
388 }
389 }
390
391 for name in add.iter() {
392 let name = name.to_string();
393 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
394 let rows = row.split("--").collect::<Vec<&str>>();
395 comments.push(format!(
396 r#"ALTER TABLE "{}" add {};"#,
397 options.table_name,
398 rows[0].trim()
399 ));
400 if rows.len() > 1 {
401 comments.push(format!(
402 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
403 options.table_name,
404 name,
405 rows[1].trim()
406 ));
407 }
408 }
409 for name in del.iter() {
410 comments.push(format!(
411 "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
412 options.table_name, name
413 ));
414 }
415 for name in put.iter() {
416 let name = name.to_string();
417 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
418 let rows = row.split("--").collect::<Vec<&str>>();
419
420 let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
421
422 if sql[1].contains("BOOLEAN") {
423 let text = format!(
424 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
425 options.table_name, name
426 );
427 comments.push(text.clone());
428 let text = format!(
429 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
430 options.table_name, name, sql[1]
431 );
432 comments.push(text.clone());
433 } else {
434 let text = format!(
435 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
436 options.table_name, name, sql[1]
437 );
438 comments.push(text.clone());
439 };
440
441 if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
442 let default_value = rows[0][default_pos + 9..].trim();
443 if !default_value.is_empty() {
444 comments.push(format!(
445 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
446 options.table_name, name, default_value
447 ));
448 }
449 }
450 let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
453 .as_str()
454 .unwrap_or("YES");
455 let old_is_required = old_is_nullable == "NO";
456
457 if old_is_required && name != options.table_key {
459 comments.push(format!(
460 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
461 options.table_name, name
462 ));
463 }
464
465 if rows.len() > 1 {
466 comments.push(format!(
467 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
468 options.table_name,
469 name,
470 rows[1].trim()
471 ));
472 }
473 }
474
475 let mut unique_new = vec![];
476 let mut index_new = vec![];
477 let mut primary_key = vec![];
478 let (_, index_list) = self.query(
479 format!(
480 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
481 options.table_name
482 )
483 .as_str(),
484 );
485 for item in index_list.members() {
486 let key_name = item["indexname"].as_str().unwrap_or("");
487 let indexdef = item["indexdef"].to_string();
488
489 if indexdef.contains(
490 format!(
491 "CREATE UNIQUE INDEX {}_{} ON",
492 options.table_name, options.table_key
493 )
494 .as_str(),
495 ) {
496 primary_key.push(key_name.to_string());
497 continue;
498 }
499 if indexdef.contains("CREATE UNIQUE INDEX") {
500 unique_new.push(key_name.to_string());
501 continue;
502 }
503 if indexdef.contains("CREATE INDEX") {
504 index_new.push(key_name.to_string());
505 continue;
506 }
507 }
508
509 if !options.table_unique.is_empty() {
510 let full_name = format!(
511 "{}_unique_{}",
512 options.table_name,
513 options.table_unique.join("_")
514 );
515 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
516 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
517 let unique = format!(
518 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
519 name,
520 options.table_name,
521 options.table_unique.join(",")
522 );
523 if !unique_new.contains(&name) {
524 comments.push(unique);
525 }
526 unique_new.retain(|x| *x != name);
527 }
528
529 for row in options.table_index.iter() {
530 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
531 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
532 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
533 let index = format!(
534 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
535 name,
536 options.table_name,
537 row.join(",")
538 );
539 if !index_new.contains(&name) {
540 comments.push(index);
541 }
542 index_new.retain(|x| *x != name);
543 }
544
545 for item in unique_new {
546 if item.ends_with("_pkey") {
547 continue;
548 }
549 if item.starts_with("unique_") {
550 comments.push(format!(
551 "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
552 options.table_name,
553 item.clone()
554 ));
555 } else {
556 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
557 }
558 }
559 for item in index_new {
560 if item.ends_with("_pkey") {
561 continue;
562 }
563 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
564 }
565
566 if self.params.sql {
567 return JsonValue::from(comments.join(""));
568 }
569
570 if comments.is_empty() {
571 return JsonValue::from(-1);
572 }
573
574 for item in comments.iter() {
575 let (state, res) = self.execute(item.as_str());
576 match state {
577 true => {}
578 false => {
579 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
580 return JsonValue::from(0);
581 }
582 }
583 }
584 JsonValue::from(1)
585 }
586
587 fn table_info(&mut self, table: &str) -> JsonValue {
588 let sql = format!(
589 "SELECT COL.COLUMN_NAME,
590 COL.DATA_TYPE,
591 COL.IS_NULLABLE,
592 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
593 LEFT JOIN
594 pg_catalog.pg_description DESCRIPTION
595 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
596 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
597 let (state, data) = self.query(sql.as_str());
598 let mut list = object! {};
599 if state {
600 for item in data.members() {
601 let mut row = object! {};
602 row["field"] = item["column_name"].clone();
603 row["comment"] = item["comment"].clone();
604 row["type"] = item["data_type"].clone();
605 row["is_nullable"] = item["is_nullable"].clone();
606 if let Some(field_name) = row["field"].as_str() {
607 list[field_name] = row.clone();
608 }
609 }
610 list
611 } else {
612 list
613 }
614 }
615
616 fn table_is_exist(&mut self, name: &str) -> bool {
617 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
618 let (state, data) = self.query(sql.as_str());
619 match state {
620 true => {
621 for item in data.members() {
622 if item.has_key("exists") {
623 return item["exists"].as_bool().unwrap_or(false);
624 }
625 }
626 false
627 }
628 false => false,
629 }
630 }
631
632 fn table(&mut self, name: &str) -> &mut Pgsql {
633 self.params = Params::default(self.connection.mode.str().as_str());
634 let table_name = format!("{}{}", self.connection.prefix, name);
635 if !super::sql_safety::validate_table_name(&table_name) {
636 error!("Invalid table name: {}", name);
637 }
638 self.params.table = table_name.clone();
639 self.params.join_table = table_name;
640 self
641 }
642
643 fn change_table(&mut self, name: &str) -> &mut Self {
644 self.params.join_table = name.to_string();
645 self
646 }
647
648 fn autoinc(&mut self) -> &mut Self {
649 self.params.autoinc = true;
650 self
651 }
652
653 fn timestamps(&mut self) -> &mut Self {
654 self.params.timestamps = true;
655 self
656 }
657
658 fn fetch_sql(&mut self) -> &mut Self {
659 self.params.sql = true;
660 self
661 }
662
663 fn order(&mut self, field: &str, by: bool) -> &mut Self {
664 self.params.order[field] = {
665 if by {
666 "DESC"
667 } else {
668 "ASC"
669 }
670 }
671 .into();
672 self
673 }
674
675 fn group(&mut self, field: &str) -> &mut Self {
676 let fields: Vec<&str> = field.split(",").collect();
677 for field in fields.iter() {
678 let field = field.to_string();
679 self.params.group[field.as_str()] = field.clone().into();
680 self.params.fields[field.as_str()] = field.clone().into();
681 }
682 self
683 }
684
685 fn distinct(&mut self) -> &mut Self {
686 self.params.distinct = true;
687 self
688 }
689
690 fn json(&mut self, field: &str) -> &mut Self {
691 let list: Vec<&str> = field.split(",").collect();
692 for item in list.iter() {
693 self.params.json[item.to_string().as_str()] = item.to_string().into();
694 }
695 self
696 }
697
698 fn location(&mut self, field: &str) -> &mut Self {
699 let list: Vec<&str> = field.split(",").collect();
700 for item in list.iter() {
701 self.params.location[item.to_string().as_str()] = item.to_string().into();
702 }
703 self
704 }
705
706 fn field(&mut self, field: &str) -> &mut Self {
707 let list: Vec<&str> = field.split(",").collect();
708 let join_table = if self.params.join_table.is_empty() {
709 self.params.table.clone()
710 } else {
711 self.params.join_table.clone()
712 };
713 for item in list.iter() {
714 if item.contains(" as ") {
715 let text = item.split(" as ").collect::<Vec<&str>>();
716 if text[0].contains("count(") {
717 self.params.fields[item.to_string().as_str()] =
718 format!("{} as {}", text[0], text[1]).into();
719 } else {
720 self.params.fields[item.to_string().as_str()] =
721 format!("{}.{} as {}", join_table, text[0], text[1]).into();
722 }
723 } else {
724 self.params.fields[item.to_string().as_str()] =
725 format!("{join_table}.{item}").into();
726 }
727 }
728 self
729 }
730
731 fn field_raw(&mut self, expr: &str) -> &mut Self {
732 self.params.fields[expr] = expr.into();
733 self
734 }
735
736 fn hidden(&mut self, name: &str) -> &mut Self {
737 let hidden: Vec<&str> = name.split(",").collect();
738
739 let fields_list = self.table_info(self.params.clone().table.as_str());
740 let mut data = array![];
741 for item in fields_list.members() {
742 let _ = data.push(object! {
743 "name":item["field"].as_str().unwrap_or("")
744 });
745 }
746
747 for item in data.members() {
748 let name = item["name"].as_str().unwrap_or("");
749 if !hidden.contains(&name) {
750 self.params.fields[name] = name.into();
751 }
752 }
753 self
754 }
755
756 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
757 for f in field.split('|') {
758 if !super::sql_safety::validate_field_name(f) {
759 error!("Invalid field name: {}", f);
760 }
761 }
762 if !super::sql_safety::validate_compare_orator(compare) {
763 error!("Invalid compare operator: {}", compare);
764 }
765 let join_table = if self.params.join_table.is_empty() {
766 self.params.table.clone()
767 } else {
768 self.params.join_table.clone()
769 };
770 if value.is_boolean() {
771 if value.as_bool().unwrap_or(false) {
772 value = 1.into();
773 } else {
774 value = 0.into();
775 }
776 }
777 match compare {
778 "between" => {
779 self.params.where_and.push(format!(
780 "{}.{} between '{}' AND '{}'",
781 join_table, field, value[0], value[1]
782 ));
783 }
784 "set" => {
785 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
786 let mut wheredata = vec![];
787 for item in list.iter() {
788 wheredata.push(format!(
789 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
790 ));
791 }
792 self.params
793 .where_and
794 .push(format!("({})", wheredata.join(" or ")));
795 }
796 "notin" => {
797 let mut text = String::new();
798 for item in value.members() {
799 text = format!("{text},'{item}'");
800 }
801 text = text.trim_start_matches(",").into();
802 self.params
803 .where_and
804 .push(format!("{join_table}.{field} not in ({text})"));
805 }
806 "is" => {
807 self.params
808 .where_and
809 .push(format!("{join_table}.{field} is {value}"));
810 }
811 "isnot" => {
812 self.params
813 .where_and
814 .push(format!("{join_table}.{field} is not {value}"));
815 }
816 "notlike" => {
817 self.params
818 .where_and
819 .push(format!("{join_table}.{field} not like '{value}'"));
820 }
821 "in" => {
822 let mut text = String::new();
823 if value.is_array() {
824 for item in value.members() {
825 text = format!("{text},'{item}'");
826 }
827 } else if value.is_null() {
828 text = format!("{text},null");
829 } else {
830 let value = value.as_str().unwrap_or("");
831
832 let value: Vec<&str> = value.split(",").collect();
833 for item in value.iter() {
834 text = format!("{text},'{item}'");
835 }
836 }
837 text = text.trim_start_matches(",").into();
838
839 self.params
840 .where_and
841 .push(format!("{join_table}.{field} {compare} ({text})"));
842 }
843 _ => {
844 self.params
845 .where_and
846 .push(format!("{join_table}.{field} {compare} '{value}'"));
847 }
848 }
849 self
850 }
851
852 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
853 for f in field.split('|') {
854 if !super::sql_safety::validate_field_name(f) {
855 error!("Invalid field name: {}", f);
856 }
857 }
858 if !super::sql_safety::validate_compare_orator(compare) {
859 error!("Invalid compare operator: {}", compare);
860 }
861 let join_table = if self.params.join_table.is_empty() {
862 self.params.table.clone()
863 } else {
864 self.params.join_table.clone()
865 };
866
867 if value.is_boolean() {
868 if value.as_bool().unwrap_or(false) {
869 value = 1.into();
870 } else {
871 value = 0.into();
872 }
873 }
874
875 match compare {
876 "between" => {
877 self.params.where_or.push(format!(
878 "{}.{} between '{}' AND '{}'",
879 join_table, field, value[0], value[1]
880 ));
881 }
882 "set" => {
883 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
884 let mut wheredata = vec![];
885 for item in list.iter() {
886 wheredata.push(format!(
887 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
888 ));
889 }
890 self.params
891 .where_or
892 .push(format!("({})", wheredata.join(" or ")));
893 }
894 "notin" => {
895 let mut text = String::new();
896 for item in value.members() {
897 text = format!("{text},'{item}'");
898 }
899 text = text.trim_start_matches(",").into();
900 self.params
901 .where_or
902 .push(format!("{join_table}.{field} not in ({text})"));
903 }
904 "is" => {
905 self.params
906 .where_or
907 .push(format!("{join_table}.{field} is {value}"));
908 }
909 "isnot" => {
910 self.params
911 .where_or
912 .push(format!("{join_table}.{field} is not {value}"));
913 }
914 "in" => {
915 let mut text = String::new();
916 if value.is_array() {
917 for item in value.members() {
918 text = format!("{text},'{item}'");
919 }
920 } else {
921 let value = value.as_str().unwrap_or("");
922 let value: Vec<&str> = value.split(",").collect();
923 for item in value.iter() {
924 text = format!("{text},'{item}'");
925 }
926 }
927 text = text.trim_start_matches(",").into();
928 self.params
929 .where_or
930 .push(format!("{join_table}.{field} {compare} ({text})"));
931 }
932 _ => {
933 self.params
934 .where_or
935 .push(format!("{join_table}.{field} {compare} '{value}'"));
936 }
937 }
938 self
939 }
940
941 fn where_raw(&mut self, expr: &str) -> &mut Self {
942 self.params.where_and.push(expr.to_string());
943 self
944 }
945
946 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
947 self.params
948 .where_and
949 .push(format!("\"{field}\" IN ({sub_sql})"));
950 self
951 }
952
953 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
954 self.params
955 .where_and
956 .push(format!("\"{field}\" NOT IN ({sub_sql})"));
957 self
958 }
959
960 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
961 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
962 self
963 }
964
965 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
966 self.params
967 .where_and
968 .push(format!("NOT EXISTS ({sub_sql})"));
969 self
970 }
971
972 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
973 self.params.where_column = format!(
974 "{}.{} {} {}.{}",
975 self.params.table, field_a, compare, self.params.table, field_b
976 );
977 self
978 }
979
980 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
981 self.params
982 .update_column
983 .push(format!("{field_a} = {compare}"));
984 self
985 }
986
987 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
988 self.params.page = page;
989 self.params.limit = limit;
990 self
991 }
992
993 fn limit(&mut self, count: i32) -> &mut Self {
994 self.params.limit_only = count;
995 self
996 }
997
998 fn column(&mut self, field: &str) -> JsonValue {
999 self.field(field);
1000 let sql = self.params.select_sql();
1001
1002 if self.params.sql {
1003 return JsonValue::from(sql);
1004 }
1005 let (state, data) = self.query(sql.as_str());
1006 match state {
1007 true => {
1008 let mut list = array![];
1009 for item in data.members() {
1010 if self.params.json[field].is_empty() {
1011 let _ = list.push(item[field].clone());
1012 } else {
1013 let data =
1014 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1015 let _ = list.push(data);
1016 }
1017 }
1018 list
1019 }
1020 false => {
1021 array![]
1022 }
1023 }
1024 }
1025
1026 fn count(&mut self) -> JsonValue {
1027 self.params.fields["count"] = "count(*) as count".to_string().into();
1028 let sql = self.params.select_sql();
1029 if self.params.sql {
1030 return JsonValue::from(sql.clone());
1031 }
1032 let (state, data) = self.query(sql.as_str());
1033 if state {
1034 data[0]["count"].clone()
1035 } else {
1036 JsonValue::from(0)
1037 }
1038 }
1039
1040 fn max(&mut self, field: &str) -> JsonValue {
1041 self.params.fields[field] = format!("max({field}) as {field}").into();
1042 let sql = self.params.select_sql();
1043 if self.params.sql {
1044 return JsonValue::from(sql.clone());
1045 }
1046 let (state, data) = self.query(sql.as_str());
1047 if state {
1048 if data.len() > 1 {
1049 return data.clone();
1050 }
1051 data[0][field].clone()
1052 } else {
1053 JsonValue::from(0)
1054 }
1055 }
1056
1057 fn min(&mut self, field: &str) -> JsonValue {
1058 self.params.fields[field] = format!("min({field}) as {field}").into();
1059 let sql = self.params.select_sql();
1060 if self.params.sql {
1061 return JsonValue::from(sql.clone());
1062 }
1063 let (state, data) = self.query(sql.as_str());
1064 if state {
1065 if data.len() > 1 {
1066 return data;
1067 }
1068 data[0][field].clone()
1069 } else {
1070 JsonValue::from(0)
1071 }
1072 }
1073
1074 fn sum(&mut self, field: &str) -> JsonValue {
1075 self.params.fields[field] = format!("sum({field}) as {field}").into();
1076 let sql = self.params.select_sql();
1077 if self.params.sql {
1078 return JsonValue::from(sql.clone());
1079 }
1080 let (state, data) = self.query(sql.as_str());
1081 match state {
1082 true => {
1083 if data.len() > 1 {
1084 return data;
1085 }
1086 data[0][field].clone()
1087 }
1088 false => JsonValue::from(0),
1089 }
1090 }
1091
1092 fn avg(&mut self, field: &str) -> JsonValue {
1093 self.params.fields[field] = format!("avg({field}) as {field}").into();
1094 let sql = self.params.select_sql();
1095 if self.params.sql {
1096 return JsonValue::from(sql.clone());
1097 }
1098 let (state, data) = self.query(sql.as_str());
1099 if state {
1100 if data.len() > 1 {
1101 return data;
1102 }
1103 data[0][field].clone()
1104 } else {
1105 JsonValue::from(0)
1106 }
1107 }
1108
1109 fn having(&mut self, expr: &str) -> &mut Self {
1110 self.params.having.push(expr.to_string());
1111 self
1112 }
1113
1114 fn select(&mut self) -> JsonValue {
1115 let sql = self.params.select_sql();
1116 if self.params.sql {
1117 return JsonValue::from(sql.clone());
1118 }
1119 let (state, mut data) = self.query(sql.as_str());
1120 match state {
1121 true => {
1122 for (field, _) in self.params.json.entries() {
1123 for item in data.members_mut() {
1124 if !item[field].is_empty() {
1125 let json = item[field].to_string();
1126 item[field] = match json::parse(&json) {
1127 Ok(e) => e,
1128 Err(_) => JsonValue::from(json),
1129 };
1130 }
1131 }
1132 }
1133 data.clone()
1134 }
1135 false => array![],
1136 }
1137 }
1138
1139 fn find(&mut self) -> JsonValue {
1140 self.params.page = 1;
1141 self.params.limit = 1;
1142 let sql = self.params.select_sql();
1143 if self.params.sql {
1144 return JsonValue::from(sql.clone());
1145 }
1146 let (state, mut data) = self.query(sql.as_str());
1147 match state {
1148 true => {
1149 if data.is_empty() {
1150 return object! {};
1151 }
1152 for (field, _) in self.params.json.entries() {
1153 if !data[0][field].is_empty() {
1154 let json = data[0][field].to_string();
1155 let json = json::parse(&json).unwrap_or(array![]);
1156 data[0][field] = json;
1157 } else {
1158 data[0][field] = array![];
1159 }
1160 }
1161 data[0].clone()
1162 }
1163 false => {
1164 object! {}
1165 }
1166 }
1167 }
1168
1169 fn value(&mut self, field: &str) -> JsonValue {
1170 self.params.fields = object! {};
1171 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1172 self.params.page = 1;
1173 self.params.limit = 1;
1174 let sql = self.params.select_sql();
1175 if self.params.sql {
1176 return JsonValue::from(sql.clone());
1177 }
1178 let (state, mut data) = self.query(sql.as_str());
1179 match state {
1180 true => {
1181 for (field, _) in self.params.json.entries() {
1182 if !data[0][field].is_empty() {
1183 let json = data[0][field].to_string();
1184 let json = json::parse(&json).unwrap_or(array![]);
1185 data[0][field] = json;
1186 } else {
1187 data[0][field] = array![];
1188 }
1189 }
1190 data[0][field].clone()
1191 }
1192 false => {
1193 if self.connection.debug {
1194 info!("{data:?}");
1195 }
1196 JsonValue::Null
1197 }
1198 }
1199 }
1200
1201 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1202 let mut fields = vec![];
1203 let mut values = vec![];
1204 if !self.params.autoinc && data["id"].is_empty() {
1205 let thread_id = format!("{:?}", std::thread::current().id());
1206 let thread_num: u64 = thread_id
1207 .trim_start_matches("ThreadId(")
1208 .trim_end_matches(")")
1209 .parse()
1210 .unwrap_or(0);
1211 data["id"] = format!(
1212 "{:X}{:X}",
1213 Local::now().timestamp_nanos_opt().unwrap_or(0),
1214 thread_num
1215 )
1216 .into();
1217 }
1218 for (field, value) in data.entries() {
1219 fields.push(format!("\"{}\"", field));
1220
1221 if value.is_string() {
1222 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1223 continue;
1224 } else if value.is_array() {
1225 if self.params.json[field].is_empty() {
1226 let array = value
1227 .members()
1228 .map(|x| x.as_str().unwrap_or(""))
1229 .collect::<Vec<&str>>()
1230 .join(",");
1231 values.push(format!("'{array}'"));
1232 } else {
1233 let json = value.to_string();
1234 let json = json.replace("'", "''");
1235 values.push(format!("'{json}'"));
1236 }
1237 continue;
1238 } else if value.is_object() {
1239 if self.params.json[field].is_empty() {
1240 values.push(format!("'{value}'"));
1241 } else {
1242 let json = value.to_string();
1243 let json = json.replace("'", "''");
1244 values.push(format!("'{json}'"));
1245 }
1246 continue;
1247 } else if value.is_number() || value.is_boolean() || value.is_null() {
1248 values.push(format!("{value}"));
1249 continue;
1250 } else {
1251 values.push(format!("'{value}'"));
1252 continue;
1253 }
1254 }
1255 let fields = fields.join(",");
1256 let values = values.join(",");
1257
1258 let sql = format!(
1259 "INSERT INTO {} ({}) VALUES ({});",
1260 self.params.table, fields, values
1261 );
1262 if self.params.sql {
1263 return JsonValue::from(sql.clone());
1264 }
1265 let (state, ids) = self.execute(sql.as_str());
1266
1267 match state {
1268 true => match self.params.autoinc {
1269 true => ids.clone(),
1270 false => data["id"].clone(),
1271 },
1272 false => {
1273 let thread_id = format!("{:?}", thread::current().id());
1274 error!("添加失败: {thread_id} {ids:?} {sql}");
1275 JsonValue::from("")
1276 }
1277 }
1278 }
1279 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1280 let mut fields = String::new();
1281 if !self.params.autoinc && data[0]["id"].is_empty() {
1282 data[0]["id"] = "".into();
1283 }
1284 for (field, _) in data[0].entries() {
1285 fields = format!("{fields},\"{field}\"");
1286 }
1287 fields = fields.trim_start_matches(",").to_string();
1288
1289 let core_count = num_cpus::get();
1290 let mut p = pools::Pool::new(core_count * 4);
1291
1292 let autoinc = self.params.autoinc;
1293 for list in data.members() {
1294 let mut item = list.clone();
1295 let i = br_fields::str::Code::verification_code(3);
1296 p.execute(move |pcindex| {
1297 if !autoinc && item["id"].is_empty() {
1298 let id = format!(
1299 "{:X}{:X}{}",
1300 Local::now().timestamp_nanos_opt().unwrap_or(0),
1301 pcindex,
1302 i
1303 );
1304 item["id"] = id.into();
1305 }
1306 let mut values = "".to_string();
1307 for (_, value) in item.entries() {
1308 if value.is_string() {
1309 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1310 } else if value.is_number() {
1311 values = format!("{values},{value}");
1312 } else if value.is_boolean() {
1313 values = format!("{values},{value}");
1314 continue;
1315 } else {
1316 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1317 }
1318 }
1319 values = format!("({})", values.trim_start_matches(","));
1320 array![item["id"].clone(), values]
1321 });
1322 }
1323 let (ids_list, mut values) = p.insert_all();
1324 values = values.trim_start_matches(",").to_string();
1325 let sql = format!(
1326 "INSERT INTO {} ({}) VALUES {};",
1327 self.params.table, fields, values
1328 );
1329
1330 if self.params.sql {
1331 return JsonValue::from(sql.clone());
1332 }
1333 let (state, data) = self.execute(sql.as_str());
1334 match state {
1335 true => match autoinc {
1336 true => data,
1337 false => JsonValue::from(ids_list),
1338 },
1339 false => {
1340 error!("insert_all: {data:?}");
1341 array![]
1342 }
1343 }
1344 }
1345 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1346 let mut fields = vec![];
1347 let mut values = vec![];
1348 if !self.params.autoinc && data["id"].is_empty() {
1349 let thread_id = format!("{:?}", std::thread::current().id());
1350 let thread_num: u64 = thread_id
1351 .trim_start_matches("ThreadId(")
1352 .trim_end_matches(")")
1353 .parse()
1354 .unwrap_or(0);
1355 data["id"] = format!(
1356 "{:X}{:X}",
1357 Local::now().timestamp_nanos_opt().unwrap_or(0),
1358 thread_num
1359 )
1360 .into();
1361 }
1362 for (field, value) in data.entries() {
1363 fields.push(format!("\"{}\"", field));
1364
1365 if value.is_string() {
1366 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1367 continue;
1368 } else if value.is_array() {
1369 if self.params.json[field].is_empty() {
1370 let array = value
1371 .members()
1372 .map(|x| x.as_str().unwrap_or(""))
1373 .collect::<Vec<&str>>()
1374 .join(",");
1375 values.push(format!("'{array}'"));
1376 } else {
1377 let json = value.to_string();
1378 let json = json.replace("'", "''");
1379 values.push(format!("'{json}'"));
1380 }
1381 continue;
1382 } else if value.is_object() {
1383 if self.params.json[field].is_empty() {
1384 values.push(format!("'{value}'"));
1385 } else {
1386 let json = value.to_string();
1387 let json = json.replace("'", "''");
1388 values.push(format!("'{json}'"));
1389 }
1390 continue;
1391 } else if value.is_number() || value.is_boolean() || value.is_null() {
1392 values.push(format!("{value}"));
1393 continue;
1394 } else {
1395 values.push(format!("'{value}'"));
1396 continue;
1397 }
1398 }
1399
1400 let conflict_cols: Vec<String> = conflict_fields
1401 .iter()
1402 .map(|f| format!("\"{}\"", f))
1403 .collect();
1404
1405 let update_set: Vec<String> = fields
1406 .iter()
1407 .filter(|f| {
1408 let name = f.trim_matches('"');
1409 !conflict_fields.contains(&name) && name != "id"
1410 })
1411 .map(|f| format!("{f}=EXCLUDED.{f}"))
1412 .collect();
1413
1414 let fields_str = fields.join(",");
1415 let values_str = values.join(",");
1416
1417 let sql = format!(
1418 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1419 self.params.table,
1420 fields_str,
1421 values_str,
1422 conflict_cols.join(","),
1423 update_set.join(",")
1424 );
1425 if self.params.sql {
1426 return JsonValue::from(sql.clone());
1427 }
1428 let (state, result) = self.execute(sql.as_str());
1429 match state {
1430 true => match self.params.autoinc {
1431 true => result.clone(),
1432 false => data["id"].clone(),
1433 },
1434 false => {
1435 let thread_id = format!("{:?}", thread::current().id());
1436 error!("upsert失败: {thread_id} {result:?} {sql}");
1437 JsonValue::from("")
1438 }
1439 }
1440 }
1441 fn update(&mut self, data: JsonValue) -> JsonValue {
1442 let mut values = vec![];
1443 for (field, value) in data.entries() {
1444 if value.is_string() {
1445 values.push(format!(
1446 "\"{}\"='{}'",
1447 field,
1448 value.to_string().replace("'", "''")
1449 ));
1450 } else if value.is_number() {
1451 values.push(format!("\"{field}\"= {value}"));
1452 } else if value.is_array() {
1453 if self.params.json[field].is_empty() {
1454 let array = value
1455 .members()
1456 .map(|x| x.as_str().unwrap_or(""))
1457 .collect::<Vec<&str>>()
1458 .join(",");
1459 values.push(format!("\"{field}\"='{array}'"));
1460 } else {
1461 let json = value.to_string();
1462 let json = json.replace("'", "''");
1463 values.push(format!("\"{field}\"='{json}'"));
1464 }
1465 continue;
1466 } else if value.is_object() {
1467 if self.params.json[field].is_empty() {
1468 values.push(format!("\"{field}\"='{value}'"));
1469 } else {
1470 if value.is_empty() {
1471 values.push(format!("\"{field}\"=''"));
1472 continue;
1473 }
1474 let json = value.to_string();
1475 let json = json.replace("'", "''");
1476 values.push(format!("\"{field}\"='{json}'"));
1477 }
1478 continue;
1479 } else if value.is_boolean() {
1480 values.push(format!("\"{field}\"= {value}"));
1481 } else {
1482 values.push(format!("\"{field}\"=\"{value}\""));
1483 }
1484 }
1485
1486 for (field, value) in self.params.inc_dec.entries() {
1487 values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1488 }
1489 if !self.params.update_column.is_empty() {
1490 values.extend(self.params.update_column.clone());
1491 }
1492 let values = values.join(",");
1493
1494 let sql = format!(
1495 "UPDATE {} SET {} {};",
1496 self.params.table.clone(),
1497 values,
1498 self.params.where_sql()
1499 );
1500 if self.params.sql {
1501 return JsonValue::from(sql.clone());
1502 }
1503 let (state, data) = self.execute(sql.as_str());
1504 if state {
1505 data
1506 } else {
1507 let thread_id = format!("{:?}", thread::current().id());
1508 error!("update: {thread_id} {data:?} {sql}");
1509 0.into()
1510 }
1511 }
1512 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1513 let mut values = vec![];
1514
1515 let mut ids = vec![];
1516 for (field, _) in data[0].entries() {
1517 if field == "id" {
1518 continue;
1519 }
1520 let mut fields = vec![];
1521 for row in data.members() {
1522 let value = row[field].clone();
1523 let id = row["id"].clone();
1524 ids.push(id.clone());
1525 if value.is_string() {
1526 fields.push(format!(
1527 "WHEN '{}' THEN '{}'",
1528 id,
1529 value.to_string().replace("'", "''")
1530 ));
1531 } else if value.is_array() || value.is_object() {
1532 if self.params.json[field].is_empty() {
1533 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1534 } else {
1535 let json = value.to_string();
1536 let json = json.replace("'", "''");
1537 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1538 }
1539 continue;
1540 } else if value.is_number() || value.is_boolean() || value.is_null() {
1541 fields.push(format!("WHEN '{id}' THEN {value}"));
1542 } else {
1543 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1544 }
1545 }
1546 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1547 }
1548 self.where_and("id", "in", ids.into());
1549 for (field, value) in self.params.inc_dec.entries() {
1550 values.push(format!("{} = {}", field, value.to_string().clone()));
1551 }
1552
1553 let values = values.join(",");
1554 let sql = format!(
1555 "UPDATE {} SET {} {} {};",
1556 self.params.table.clone(),
1557 values,
1558 self.params.where_sql(),
1559 self.params.page_limit_sql()
1560 );
1561 if self.params.sql {
1562 return JsonValue::from(sql.clone());
1563 }
1564 let (state, data) = self.execute(sql.as_str());
1565 if state {
1566 data
1567 } else {
1568 error!("update_all: {data:?}");
1569 JsonValue::from(0)
1570 }
1571 }
1572
1573 fn delete(&mut self) -> JsonValue {
1574 let sql = format!(
1575 "delete FROM {} {} {};",
1576 self.params.table.clone(),
1577 self.params.where_sql(),
1578 self.params.page_limit_sql()
1579 );
1580 if self.params.sql {
1581 return JsonValue::from(sql.clone());
1582 }
1583 let (state, data) = self.execute(sql.as_str());
1584 match state {
1585 true => data,
1586 false => {
1587 error!("delete 失败>>> {data:?}");
1588 JsonValue::from(0)
1589 }
1590 }
1591 }
1592
1593 fn transaction(&mut self) -> bool {
1594 let thread_id = format!("{:?}", thread::current().id());
1595 let key = format!("{}{}", self.default, thread_id);
1596
1597 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1598 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1599 PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1600 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1601 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1602 return true;
1603 }
1604
1605 let mut conn = match self.client.get_connect_for_transaction() {
1606 Ok(c) => c,
1607 Err(e) => {
1608 error!("获取事务连接失败: {e}");
1609 return false;
1610 }
1611 };
1612
1613 if !conn.is_valid() {
1614 error!("事务连接无效");
1615 self.client.release_transaction_conn();
1616 return false;
1617 }
1618
1619 if let Err(e) = conn.execute("START TRANSACTION") {
1620 error!("启动事务失败: {e}");
1621 self.client.release_transaction_conn();
1622 return false;
1623 }
1624
1625 if let Err(e) = conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") {
1626 error!("设置事务隔离级别失败: {e}");
1627 let _ = conn.execute("ROLLBACK");
1628 self.client.release_transaction_conn();
1629 return false;
1630 }
1631
1632 PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1633 true
1634 }
1635 fn commit(&mut self) -> bool {
1636 let thread_id = format!("{:?}", thread::current().id());
1637 let key = format!("{}{}", self.default, thread_id);
1638
1639 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1640 error!("commit: 没有活跃的事务");
1641 return false;
1642 }
1643
1644 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1645 if depth > 1 {
1646 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1647 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1648 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1649 return true;
1650 }
1651
1652 let commit_result =
1653 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1654
1655 let success = match commit_result {
1656 Some(Ok(_)) => true,
1657 Some(Err(e)) => {
1658 error!("提交事务失败: {e}");
1659 false
1660 }
1661 None => {
1662 error!("提交事务失败: 未找到连接");
1663 false
1664 }
1665 };
1666
1667 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1668 self.client.release_transaction_conn();
1669 success
1670 }
1671
1672 fn rollback(&mut self) -> bool {
1673 let thread_id = format!("{:?}", thread::current().id());
1674 let key = format!("{}{}", self.default, thread_id);
1675
1676 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1677 error!("rollback: 没有活跃的事务");
1678 return false;
1679 }
1680
1681 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1682 if depth > 1 {
1683 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1684 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1685 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1686 return true;
1687 }
1688
1689 let rollback_result =
1690 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1691
1692 let success = match rollback_result {
1693 Some(Ok(_)) => true,
1694 Some(Err(e)) => {
1695 error!("回滚失败: {e}");
1696 false
1697 }
1698 None => {
1699 error!("回滚失败: 未找到连接");
1700 false
1701 }
1702 };
1703
1704 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1705 self.client.release_transaction_conn();
1706 success
1707 }
1708
1709 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1710 let (state, data) = self.query(sql);
1711 match state {
1712 true => Ok(data),
1713 false => Err(data.to_string()),
1714 }
1715 }
1716
1717 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1718 let (state, data) = self.execute(sql);
1719 match state {
1720 true => Ok(data),
1721 false => Err(data.to_string()),
1722 }
1723 }
1724
1725 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1726 self.params.inc_dec[field] = format!("{field} + {num}").into();
1727 self
1728 }
1729
1730 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1731 self.params.inc_dec[field] = format!("{field} - {num}").into();
1732 self
1733 }
1734 fn buildsql(&mut self) -> String {
1735 self.fetch_sql();
1736 let sql = self.select().to_string();
1737 format!("( {} ) {}", sql, self.params.table)
1738 }
1739
1740 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1741 for field in fields {
1742 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1743 }
1744 self
1745 }
1746
1747 fn join(
1748 &mut self,
1749 main_table: &str,
1750 main_fields: &str,
1751 right_table: &str,
1752 right_fields: &str,
1753 ) -> &mut Self {
1754 let main_table = if main_table.is_empty() {
1755 self.params.table.clone()
1756 } else {
1757 main_table.to_string()
1758 };
1759 self.params.join_table = right_table.to_string();
1760 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1761 self
1762 }
1763
1764 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1765 let main_fields = if main_fields.is_empty() {
1766 "id"
1767 } else {
1768 main_fields
1769 };
1770 let second_fields = if second_fields.is_empty() {
1771 self.params.table.clone()
1772 } else {
1773 second_fields.to_string().clone()
1774 };
1775 let sec_table_name = format!("{}{}", table, "_2");
1776 let second_table = format!("{} {}", table, sec_table_name.clone());
1777 self.params.join_table = sec_table_name.clone();
1778 self.params.join.push(format!(
1779 " INNER JOIN {} ON {}.{} = {}.{}",
1780 second_table, self.params.table, main_fields, sec_table_name, second_fields
1781 ));
1782 self
1783 }
1784
1785 fn join_right(
1786 &mut self,
1787 main_table: &str,
1788 main_fields: &str,
1789 right_table: &str,
1790 right_fields: &str,
1791 ) -> &mut Self {
1792 let main_table = if main_table.is_empty() {
1793 self.params.table.clone()
1794 } else {
1795 main_table.to_string()
1796 };
1797 self.params.join_table = right_table.to_string();
1798 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1799 self
1800 }
1801
1802 fn join_full(
1803 &mut self,
1804 main_table: &str,
1805 main_fields: &str,
1806 right_table: &str,
1807 right_fields: &str,
1808 ) -> &mut Self {
1809 let main_table = if main_table.is_empty() {
1810 self.params.table.clone()
1811 } else {
1812 main_table.to_string()
1813 };
1814 self.params.join_table = right_table.to_string();
1815 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1816 self
1817 }
1818
1819 fn union(&mut self, sub_sql: &str) -> &mut Self {
1820 self.params.unions.push(format!("UNION {sub_sql}"));
1821 self
1822 }
1823
1824 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1825 self.params.unions.push(format!("UNION ALL {sub_sql}"));
1826 self
1827 }
1828
1829 fn lock_for_update(&mut self) -> &mut Self {
1830 self.params.lock_mode = "FOR UPDATE".to_string();
1831 self
1832 }
1833
1834 fn lock_for_share(&mut self) -> &mut Self {
1835 self.params.lock_mode = "FOR SHARE".to_string();
1836 self
1837 }
1838}