1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use chrono::Local;
7use json::{array, object, JsonValue};
8use log::{error, info};
9use std::thread;
10#[derive(Clone)]
11pub struct Pgsql {
12 pub connection: Connection,
14 pub default: String,
16 pub params: Params,
17 pub client: Pools,
18}
19
20impl Pgsql {
21 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
22 let port = connection
23 .hostport
24 .parse::<i32>()
25 .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
26
27 let cp_connection = connection.clone();
28 let config = object! {
29 debug: cp_connection.debug,
30 username: cp_connection.username,
31 userpass: cp_connection.userpass,
32 database: cp_connection.database,
33 hostname: cp_connection.hostname,
34 hostport: port,
35 charset: cp_connection.charset.str(),
36 };
37 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
38
39 let pools = pgsql.pools()?;
40 Ok(Self {
41 connection,
42 default: default.clone(),
43 params: Params::default("pgsql"),
44 client: pools,
45 })
46 }
47
48 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
49 let thread_id = format!("{:?}", thread::current().id());
50 let key = format!("{}{}", self.default, thread_id);
51
52 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
53 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
54
55 match result {
56 Some(Ok(e)) => {
57 if self.connection.debug {
58 info!("查询成功: {} {}", thread_id.clone(), sql);
59 }
60 (true, e.rows)
61 }
62 Some(Err(e)) => {
63 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
64 (false, JsonValue::from(e.to_string()))
65 }
66 None => {
67 error!("事务查询失败: 未找到事务连接 {thread_id}");
68 (false, JsonValue::from("未找到事务连接"))
69 }
70 }
71 } else {
72 let mut guard = match self.client.get_guard() {
73 Ok(g) => g,
74 Err(e) => {
75 error!(
76 "非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
77 );
78 return (false, JsonValue::from(e.to_string()));
79 }
80 };
81
82 let res = guard.conn().query(sql);
83 match res {
84 Ok(e) => {
85 if self.connection.debug {
86 info!("查询成功: {} {}", thread_id.clone(), sql);
87 }
88 (true, e.rows)
89 }
90 Err(e) => {
91 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
92 (false, JsonValue::from(e.to_string()))
93 }
94 }
95 }
96 }
97 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
98 let thread_id = format!("{:?}", thread::current().id());
99 let key = format!("{}{}", self.default, thread_id);
100
101 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
102 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
103
104 match result {
105 Some(Ok(e)) => {
106 if self.connection.debug {
107 info!("提交成功: {} {}", thread_id.clone(), sql);
108 }
109 if sql.contains("INSERT") {
110 (true, e.rows)
111 } else {
112 (true, e.affect_count.into())
113 }
114 }
115 Some(Err(e)) => {
116 error!("事务提交失败: {thread_id} {e} {sql}");
117 (false, JsonValue::from(e.to_string()))
118 }
119 None => {
120 error!("事务执行失败: 未找到事务连接 {thread_id}");
121 (false, JsonValue::from("未找到事务连接"))
122 }
123 }
124 } else {
125 let mut guard = match self.client.get_guard() {
126 Ok(g) => g,
127 Err(e) => {
128 error!(
129 "非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
130 );
131 return (false, JsonValue::from(e.to_string()));
132 }
133 };
134
135 let res = guard.conn().execute(sql);
136 match res {
137 Ok(e) => {
138 if self.connection.debug {
139 info!("提交成功: {} {}", thread_id.clone(), sql);
140 }
141 if sql.contains("INSERT") {
142 (true, e.rows)
143 } else {
144 (true, e.affect_count.into())
145 }
146 }
147 Err(e) => {
148 error!("非事务提交失败: {thread_id} {e} {sql}");
149 (false, JsonValue::from(e.to_string()))
150 }
151 }
152 }
153 }
154}
155
156impl DbMode for Pgsql {
157 fn database_tables(&mut self) -> JsonValue {
158 let sql = "SHOW TABLES".to_string();
159 match self.sql(sql.as_str()) {
160 Ok(e) => {
161 let mut list = vec![];
162 for item in e.members() {
163 for (_, value) in item.entries() {
164 list.push(value.clone());
165 }
166 }
167 list.into()
168 }
169 Err(_) => {
170 array![]
171 }
172 }
173 }
174
175 fn database_create(&mut self, name: &str) -> bool {
176 let sql = format!("CREATE DATABASE {name}");
177
178 let (state, data) = self.execute(sql.as_str());
179 match state {
180 true => data.as_bool().unwrap_or(true),
181 false => {
182 error!("创建数据库失败: {data:?}");
183 false
184 }
185 }
186 }
187
188 fn truncate(&mut self, table: &str) -> bool {
189 let sql = format!("TRUNCATE TABLE {table}");
190 let (state, _) = self.execute(sql.as_str());
191 state
192 }
193}
194
195impl Mode for Pgsql {
196 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
197 let mut sql = String::new();
198 let mut comments = vec![];
199
200 if !options.table_unique.is_empty() {
201 let full_name = format!(
202 "{}_unique_{}",
203 options.table_name,
204 options.table_unique.join("_")
205 );
206 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
207 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
208 let unique = format!(
209 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
210 name,
211 options.table_name,
212 options.table_unique.join(",")
213 );
214 comments.push(unique);
215 }
216
217 for row in options.table_index.iter() {
218 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
219 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
220 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
221 let index = format!(
222 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
223 name,
224 options.table_name,
225 row.join(",")
226 );
227 comments.push(index);
228 }
229
230 for (name, field) in options.table_fields.entries_mut() {
231 field["table_name"] = options.table_name.clone().into();
232 let row = br_fields::field("pgsql", name, field.clone());
233 let (col_sql, meta) = if let Some(idx) = row.find("--") {
234 (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
235 } else {
236 (row.trim(), None)
237 };
238 if let Some(meta) = meta {
239 comments.push(format!(
240 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
241 options.table_name, name, meta
242 ));
243 }
244 sql = format!("{} {},\r\n", sql, col_sql);
245 }
246
247 let primary_key = format!(
248 "CONSTRAINT {}_{} PRIMARY KEY ({})",
249 options.table_name, options.table_key, options.table_key
250 );
251 let sql = format!(
252 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
253 options.table_name,
254 sql.trim_end_matches(",\r\n"),
255 primary_key
256 );
257 comments.insert(0, sql);
258
259 for (_name, field) in options.table_fields.entries() {
260 let _ = field["mode"].as_str();
261 }
262
263 if self.params.sql {
264 let info = comments.join("\r\n");
265 return JsonValue::from(info);
266 }
267 for comment in comments {
268 let (state, _) = self.execute(comment.as_str());
269 match state {
270 true => {}
271 false => {
272 return JsonValue::from(state);
273 }
274 }
275 }
276 JsonValue::from(true)
277 }
278
279 fn table_update(&mut self, options: TableOptions) -> JsonValue {
280 let fields_list = self.table_info(&options.table_name);
281 let mut put = vec![];
282 let mut add = vec![];
283 let mut del = vec![];
284 let mut comments = vec![];
285
286 for (key, _) in fields_list.entries() {
287 if options.table_fields[key].is_empty() {
288 del.push(key);
289 }
290 }
291 for (name, field) in options.table_fields.entries() {
292 if !fields_list[name].is_empty() {
293 let old_info = &fields_list[name];
294 let new_field_sql = br_fields::field("pgsql", name, field.clone());
295
296 let old_comment = old_info["comment"].as_str().unwrap_or("");
297 let old_type = old_info["type"].as_str().unwrap_or("");
298
299 let new_comment = if let Some(idx) = new_field_sql.find("--") {
300 new_field_sql[idx + 2..].trim()
301 } else {
302 ""
303 };
304
305 let comment_matches =
306 if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
307 let old_parts: Vec<&str> = old_comment.split('|').collect();
308 let new_parts: Vec<&str> = new_comment.split('|').collect();
309 if old_parts.len() >= 4 && new_parts.len() >= 4 {
310 old_parts[..4] == new_parts[..4]
311 } else {
312 old_comment == new_comment
313 }
314 } else if !old_comment.is_empty() && !new_comment.is_empty() {
315 let old_parts: Vec<&str> = old_comment.split('|').collect();
316 let new_parts: Vec<&str> = new_comment.split('|').collect();
317 if old_parts.len() >= 2
318 && new_parts.len() >= 2
319 && old_parts.len() == new_parts.len()
320 {
321 let old_filtered: Vec<&str> = old_parts
322 .iter()
323 .filter(|v| **v != "true" && **v != "false")
324 .copied()
325 .collect();
326 let new_filtered: Vec<&str> = new_parts
327 .iter()
328 .filter(|v| **v != "true" && **v != "false")
329 .copied()
330 .collect();
331 old_filtered == new_filtered
332 } else {
333 old_comment == new_comment
334 }
335 } else {
336 old_comment == new_comment
337 };
338
339 let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
340 let new_type = if sql_parts.len() > 1 {
341 sql_parts[1].to_lowercase()
342 } else {
343 String::new()
344 };
345
346 let type_matches = match old_type {
347 "integer" => {
348 new_type.contains("int")
349 && !new_type.contains("bigint")
350 && !new_type.contains("smallint")
351 }
352 "bigint" => new_type.contains("bigint"),
353 "smallint" => new_type.contains("smallint"),
354 "boolean" => new_type.contains("boolean"),
355 "text" => new_type.contains("text"),
356 "character varying" => new_type.contains("varchar"),
357 "character" => new_type.contains("char") && !new_type.contains("varchar"),
358 "numeric" => new_type.contains("numeric") || new_type.contains("decimal"),
359 "double precision" => {
360 new_type.contains("double") || new_type.contains("float8")
361 }
362 "real" => new_type.contains("real") || new_type.contains("float4"),
363 "timestamp without time zone" | "timestamp with time zone" => {
364 new_type.contains("timestamp")
365 }
366 "date" => new_type.contains("date") && !new_type.contains("timestamp"),
367 "time without time zone" | "time with time zone" => {
368 new_type.contains("time") && !new_type.contains("timestamp")
369 }
370 "json" | "jsonb" => new_type.contains("json"),
371 "uuid" => new_type.contains("uuid"),
372 "bytea" => new_type.contains("bytea"),
373 _ => old_type == new_type,
374 };
375
376 if type_matches && comment_matches {
377 continue;
378 }
379
380 log::debug!(
381 "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
382 options.table_name,
383 name,
384 type_matches,
385 old_type,
386 new_type,
387 comment_matches
388 );
389 put.push(name);
390 } else {
391 add.push(name);
392 }
393 }
394
395 for name in add.iter() {
396 let name = name.to_string();
397 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
398 let rows = row.split("--").collect::<Vec<&str>>();
399 comments.push(format!(
400 r#"ALTER TABLE "{}" add {};"#,
401 options.table_name,
402 rows[0].trim()
403 ));
404 if rows.len() > 1 {
405 comments.push(format!(
406 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
407 options.table_name,
408 name,
409 rows[1].trim()
410 ));
411 }
412 }
413 for name in del.iter() {
414 comments.push(format!(
415 "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
416 options.table_name, name
417 ));
418 }
419 for name in put.iter() {
420 let name = name.to_string();
421 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
422 let rows = row.split("--").collect::<Vec<&str>>();
423
424 let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
425
426 if sql[1].contains("BOOLEAN") {
427 let text = format!(
428 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
429 options.table_name, name
430 );
431 comments.push(text.clone());
432 let text = format!(
433 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
434 options.table_name, name, sql[1]
435 );
436 comments.push(text.clone());
437 } else {
438 let text = format!(
439 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
440 options.table_name, name, sql[1]
441 );
442 comments.push(text.clone());
443 };
444
445 if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
446 let default_value = rows[0][default_pos + 9..].trim();
447 if !default_value.is_empty() {
448 comments.push(format!(
449 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
450 options.table_name, name, default_value
451 ));
452 }
453 }
454 let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
457 .as_str()
458 .unwrap_or("YES");
459 let old_is_required = old_is_nullable == "NO";
460
461 if old_is_required && name != options.table_key {
463 comments.push(format!(
464 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
465 options.table_name, name
466 ));
467 }
468
469 if rows.len() > 1 {
470 comments.push(format!(
471 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
472 options.table_name,
473 name,
474 rows[1].trim()
475 ));
476 }
477 }
478
479 let mut unique_new = vec![];
480 let mut index_new = vec![];
481 let mut primary_key = vec![];
482 let (_, index_list) = self.query(
483 format!(
484 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
485 options.table_name
486 )
487 .as_str(),
488 );
489 for item in index_list.members() {
490 let key_name = item["indexname"].as_str().unwrap_or("");
491 let indexdef = item["indexdef"].to_string();
492
493 if indexdef.contains(
494 format!(
495 "CREATE UNIQUE INDEX {}_{} ON",
496 options.table_name, options.table_key
497 )
498 .as_str(),
499 ) {
500 primary_key.push(key_name.to_string());
501 continue;
502 }
503 if indexdef.contains("CREATE UNIQUE INDEX") {
504 unique_new.push(key_name.to_string());
505 continue;
506 }
507 if indexdef.contains("CREATE INDEX") {
508 index_new.push(key_name.to_string());
509 continue;
510 }
511 }
512
513 if !options.table_unique.is_empty() {
514 let full_name = format!(
515 "{}_unique_{}",
516 options.table_name,
517 options.table_unique.join("_")
518 );
519 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
520 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
521 let unique = format!(
522 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
523 name,
524 options.table_name,
525 options.table_unique.join(",")
526 );
527 if !unique_new.contains(&name) {
528 comments.push(unique);
529 }
530 unique_new.retain(|x| *x != name);
531 }
532
533 for row in options.table_index.iter() {
534 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
535 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
536 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
537 let index = format!(
538 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
539 name,
540 options.table_name,
541 row.join(",")
542 );
543 if !index_new.contains(&name) {
544 comments.push(index);
545 }
546 index_new.retain(|x| *x != name);
547 }
548
549 for item in unique_new {
550 if item.ends_with("_pkey") {
551 continue;
552 }
553 if item.starts_with("unique_") {
554 comments.push(format!(
555 "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
556 options.table_name,
557 item.clone()
558 ));
559 } else {
560 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
561 }
562 }
563 for item in index_new {
564 if item.ends_with("_pkey") {
565 continue;
566 }
567 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
568 }
569
570 if self.params.sql {
571 return JsonValue::from(comments.join(""));
572 }
573
574 if comments.is_empty() {
575 return JsonValue::from(-1);
576 }
577
578 for item in comments.iter() {
579 let (state, res) = self.execute(item.as_str());
580 match state {
581 true => {}
582 false => {
583 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
584 return JsonValue::from(0);
585 }
586 }
587 }
588 JsonValue::from(1)
589 }
590
591 fn table_info(&mut self, table: &str) -> JsonValue {
592 let sql = format!(
593 "SELECT COL.COLUMN_NAME,
594 COL.DATA_TYPE,
595 COL.IS_NULLABLE,
596 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
597 LEFT JOIN
598 pg_catalog.pg_description DESCRIPTION
599 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
600 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
601 let (state, data) = self.query(sql.as_str());
602 let mut list = object! {};
603 if state {
604 for item in data.members() {
605 let mut row = object! {};
606 row["field"] = item["column_name"].clone();
607 row["comment"] = item["comment"].clone();
608 row["type"] = item["data_type"].clone();
609 row["is_nullable"] = item["is_nullable"].clone();
610 if let Some(field_name) = row["field"].as_str() {
611 list[field_name] = row.clone();
612 }
613 }
614 list
615 } else {
616 list
617 }
618 }
619
620 fn table_is_exist(&mut self, name: &str) -> bool {
621 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
622 let (state, data) = self.query(sql.as_str());
623 match state {
624 true => {
625 for item in data.members() {
626 if item.has_key("exists") {
627 return item["exists"].as_bool().unwrap_or(false);
628 }
629 }
630 false
631 }
632 false => false,
633 }
634 }
635
636 fn table(&mut self, name: &str) -> &mut Pgsql {
637 self.params = Params::default(self.connection.mode.str().as_str());
638 let table_name = format!("{}{}", self.connection.prefix, name);
639 if !super::sql_safety::validate_table_name(&table_name) {
640 error!("Invalid table name: {}", name);
641 }
642 self.params.table = table_name.clone();
643 self.params.join_table = table_name;
644 self
645 }
646
647 fn change_table(&mut self, name: &str) -> &mut Self {
648 self.params.join_table = name.to_string();
649 self
650 }
651
652 fn autoinc(&mut self) -> &mut Self {
653 self.params.autoinc = true;
654 self
655 }
656
657 fn timestamps(&mut self) -> &mut Self {
658 self.params.timestamps = true;
659 self
660 }
661
662 fn fetch_sql(&mut self) -> &mut Self {
663 self.params.sql = true;
664 self
665 }
666
667 fn order(&mut self, field: &str, by: bool) -> &mut Self {
668 self.params.order[field] = {
669 if by {
670 "DESC"
671 } else {
672 "ASC"
673 }
674 }
675 .into();
676 self
677 }
678
679 fn group(&mut self, field: &str) -> &mut Self {
680 let fields: Vec<&str> = field.split(",").collect();
681 for field in fields.iter() {
682 let field = field.to_string();
683 self.params.group[field.as_str()] = field.clone().into();
684 self.params.fields[field.as_str()] = field.clone().into();
685 }
686 self
687 }
688
689 fn distinct(&mut self) -> &mut Self {
690 self.params.distinct = true;
691 self
692 }
693
694 fn json(&mut self, field: &str) -> &mut Self {
695 let list: Vec<&str> = field.split(",").collect();
696 for item in list.iter() {
697 self.params.json[item.to_string().as_str()] = item.to_string().into();
698 }
699 self
700 }
701
702 fn location(&mut self, field: &str) -> &mut Self {
703 let list: Vec<&str> = field.split(",").collect();
704 for item in list.iter() {
705 self.params.location[item.to_string().as_str()] = item.to_string().into();
706 }
707 self
708 }
709
710 fn field(&mut self, field: &str) -> &mut Self {
711 let list: Vec<&str> = field.split(",").collect();
712 let join_table = if self.params.join_table.is_empty() {
713 self.params.table.clone()
714 } else {
715 self.params.join_table.clone()
716 };
717 for item in list.iter() {
718 let lower = item.to_lowercase();
719 let is_expr = lower.contains("count(")
720 || lower.contains("sum(")
721 || lower.contains("avg(")
722 || lower.contains("max(")
723 || lower.contains("min(")
724 || lower.contains("case ");
725 if is_expr {
726 self.params.fields[item.to_string().as_str()] = (*item).into();
727 } else if item.contains(" as ") {
728 let text = item.split(" as ").collect::<Vec<&str>>();
729 self.params.fields[item.to_string().as_str()] =
730 format!("{}.{} as {}", join_table, text[0], text[1]).into();
731 } else {
732 self.params.fields[item.to_string().as_str()] =
733 format!("{join_table}.{item}").into();
734 }
735 }
736 self
737 }
738
739 fn field_raw(&mut self, expr: &str) -> &mut Self {
740 self.params.fields[expr] = expr.into();
741 self
742 }
743
744 fn hidden(&mut self, name: &str) -> &mut Self {
745 let hidden: Vec<&str> = name.split(",").collect();
746
747 let fields_list = self.table_info(self.params.clone().table.as_str());
748 let mut data = array![];
749 for item in fields_list.members() {
750 let _ = data.push(object! {
751 "name":item["field"].as_str().unwrap_or("")
752 });
753 }
754
755 for item in data.members() {
756 let name = item["name"].as_str().unwrap_or("");
757 if !hidden.contains(&name) {
758 self.params.fields[name] = name.into();
759 }
760 }
761 self
762 }
763
764 fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
765 for f in field.split('|') {
766 if !super::sql_safety::validate_field_name(f) {
767 error!("Invalid field name: {}", f);
768 }
769 }
770 if !super::sql_safety::validate_compare_orator(compare) {
771 error!("Invalid compare operator: {}", compare);
772 }
773 let join_table = if self.params.join_table.is_empty() {
774 self.params.table.clone()
775 } else {
776 self.params.join_table.clone()
777 };
778 if value.is_boolean() {
779 let bool_val = value.as_bool().unwrap_or(false);
780 self.params
781 .where_and
782 .push(format!("{join_table}.{field} {compare} {bool_val}"));
783 return self;
784 }
785 match compare {
786 "between" => {
787 self.params.where_and.push(format!(
788 "{}.{} between '{}' AND '{}'",
789 join_table, field, value[0], value[1]
790 ));
791 }
792 "set" => {
793 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
794 let mut wheredata = vec![];
795 for item in list.iter() {
796 wheredata.push(format!(
797 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
798 ));
799 }
800 self.params
801 .where_and
802 .push(format!("({})", wheredata.join(" or ")));
803 }
804 "notin" => {
805 let mut text = String::new();
806 for item in value.members() {
807 text = format!("{text},'{item}'");
808 }
809 text = text.trim_start_matches(",").into();
810 self.params
811 .where_and
812 .push(format!("{join_table}.{field} not in ({text})"));
813 }
814 "is" => {
815 self.params
816 .where_and
817 .push(format!("{join_table}.{field} is {value}"));
818 }
819 "isnot" => {
820 self.params
821 .where_and
822 .push(format!("{join_table}.{field} is not {value}"));
823 }
824 "notlike" => {
825 self.params
826 .where_and
827 .push(format!("{join_table}.{field} not like '{value}'"));
828 }
829 "in" => {
830 if value.is_array() && value.is_empty() {
831 self.params.where_and.push("1=0".to_string());
832 return self;
833 }
834 let mut text = String::new();
835 if value.is_array() {
836 for item in value.members() {
837 text = format!("{text},'{item}'");
838 }
839 } else if value.is_null() {
840 text = format!("{text},null");
841 } else {
842 let value = value.as_str().unwrap_or("");
843
844 let value: Vec<&str> = value.split(",").collect();
845 for item in value.iter() {
846 text = format!("{text},'{item}'");
847 }
848 }
849 text = text.trim_start_matches(",").into();
850
851 self.params
852 .where_and
853 .push(format!("{join_table}.{field} {compare} ({text})"));
854 }
855 _ => {
856 self.params
857 .where_and
858 .push(format!("{join_table}.{field} {compare} '{value}'"));
859 }
860 }
861 self
862 }
863
864 fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
865 for f in field.split('|') {
866 if !super::sql_safety::validate_field_name(f) {
867 error!("Invalid field name: {}", f);
868 }
869 }
870 if !super::sql_safety::validate_compare_orator(compare) {
871 error!("Invalid compare operator: {}", compare);
872 }
873 let join_table = if self.params.join_table.is_empty() {
874 self.params.table.clone()
875 } else {
876 self.params.join_table.clone()
877 };
878
879 if value.is_boolean() {
880 let bool_val = value.as_bool().unwrap_or(false);
881 self.params
882 .where_or
883 .push(format!("{join_table}.{field} {compare} {bool_val}"));
884 return self;
885 }
886
887 match compare {
888 "between" => {
889 self.params.where_or.push(format!(
890 "{}.{} between '{}' AND '{}'",
891 join_table, field, value[0], value[1]
892 ));
893 }
894 "set" => {
895 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
896 let mut wheredata = vec![];
897 for item in list.iter() {
898 wheredata.push(format!(
899 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
900 ));
901 }
902 self.params
903 .where_or
904 .push(format!("({})", wheredata.join(" or ")));
905 }
906 "notin" => {
907 let mut text = String::new();
908 for item in value.members() {
909 text = format!("{text},'{item}'");
910 }
911 text = text.trim_start_matches(",").into();
912 self.params
913 .where_or
914 .push(format!("{join_table}.{field} not in ({text})"));
915 }
916 "is" => {
917 self.params
918 .where_or
919 .push(format!("{join_table}.{field} is {value}"));
920 }
921 "isnot" => {
922 self.params
923 .where_or
924 .push(format!("{join_table}.{field} is not {value}"));
925 }
926 "in" => {
927 if value.is_array() && value.is_empty() {
928 self.params.where_or.push("1=0".to_string());
929 return self;
930 }
931 let mut text = String::new();
932 if value.is_array() {
933 for item in value.members() {
934 text = format!("{text},'{item}'");
935 }
936 } else {
937 let value = value.as_str().unwrap_or("");
938 let value: Vec<&str> = value.split(",").collect();
939 for item in value.iter() {
940 text = format!("{text},'{item}'");
941 }
942 }
943 text = text.trim_start_matches(",").into();
944 self.params
945 .where_or
946 .push(format!("{join_table}.{field} {compare} ({text})"));
947 }
948 _ => {
949 self.params
950 .where_or
951 .push(format!("{join_table}.{field} {compare} '{value}'"));
952 }
953 }
954 self
955 }
956
957 fn where_raw(&mut self, expr: &str) -> &mut Self {
958 self.params.where_and.push(expr.to_string());
959 self
960 }
961
962 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
963 self.params
964 .where_and
965 .push(format!("\"{field}\" IN ({sub_sql})"));
966 self
967 }
968
969 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
970 self.params
971 .where_and
972 .push(format!("\"{field}\" NOT IN ({sub_sql})"));
973 self
974 }
975
976 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
977 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
978 self
979 }
980
981 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
982 self.params
983 .where_and
984 .push(format!("NOT EXISTS ({sub_sql})"));
985 self
986 }
987
988 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
989 self.params.where_column = format!(
990 "{}.{} {} {}.{}",
991 self.params.table, field_a, compare, self.params.table, field_b
992 );
993 self
994 }
995
996 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
997 self.params
998 .update_column
999 .push(format!("{field_a} = {compare}"));
1000 self
1001 }
1002
1003 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1004 self.params.page = page;
1005 self.params.limit = limit;
1006 self
1007 }
1008
1009 fn limit(&mut self, count: i32) -> &mut Self {
1010 self.params.limit_only = count;
1011 self
1012 }
1013
1014 fn column(&mut self, field: &str) -> JsonValue {
1015 self.field(field);
1016 let sql = self.params.select_sql();
1017
1018 if self.params.sql {
1019 return JsonValue::from(sql);
1020 }
1021 let (state, data) = self.query(sql.as_str());
1022 match state {
1023 true => {
1024 let mut list = array![];
1025 for item in data.members() {
1026 if self.params.json[field].is_empty() {
1027 let _ = list.push(item[field].clone());
1028 } else {
1029 let data =
1030 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1031 let _ = list.push(data);
1032 }
1033 }
1034 list
1035 }
1036 false => {
1037 array![]
1038 }
1039 }
1040 }
1041
1042 fn count(&mut self) -> JsonValue {
1043 self.params.fields = json::object! {};
1044 self.params.fields["count"] = "count(*) as count".to_string().into();
1045 let sql = self.params.select_sql();
1046 if self.params.sql {
1047 return JsonValue::from(sql.clone());
1048 }
1049 let (state, data) = self.query(sql.as_str());
1050 if state {
1051 data[0]["count"].clone()
1052 } else {
1053 JsonValue::from(0)
1054 }
1055 }
1056
1057 fn max(&mut self, field: &str) -> JsonValue {
1058 self.params.fields[field] = format!("max({field}) as {field}").into();
1059 let sql = self.params.select_sql();
1060 if self.params.sql {
1061 return JsonValue::from(sql.clone());
1062 }
1063 let (state, data) = self.query(sql.as_str());
1064 if state {
1065 if data.len() > 1 {
1066 return data.clone();
1067 }
1068 data[0][field].clone()
1069 } else {
1070 JsonValue::from(0)
1071 }
1072 }
1073
1074 fn min(&mut self, field: &str) -> JsonValue {
1075 self.params.fields[field] = format!("min({field}) as {field}").into();
1076 let sql = self.params.select_sql();
1077 if self.params.sql {
1078 return JsonValue::from(sql.clone());
1079 }
1080 let (state, data) = self.query(sql.as_str());
1081 if state {
1082 if data.len() > 1 {
1083 return data;
1084 }
1085 data[0][field].clone()
1086 } else {
1087 JsonValue::from(0)
1088 }
1089 }
1090
1091 fn sum(&mut self, field: &str) -> JsonValue {
1092 self.params.fields[field] = format!("sum({field}) as {field}").into();
1093 let sql = self.params.select_sql();
1094 if self.params.sql {
1095 return JsonValue::from(sql.clone());
1096 }
1097 let (state, data) = self.query(sql.as_str());
1098 match state {
1099 true => {
1100 if data.len() > 1 {
1101 return data;
1102 }
1103 data[0][field].clone()
1104 }
1105 false => JsonValue::from(0),
1106 }
1107 }
1108
1109 fn avg(&mut self, field: &str) -> JsonValue {
1110 self.params.fields[field] = format!("avg({field}) as {field}").into();
1111 let sql = self.params.select_sql();
1112 if self.params.sql {
1113 return JsonValue::from(sql.clone());
1114 }
1115 let (state, data) = self.query(sql.as_str());
1116 if state {
1117 if data.len() > 1 {
1118 return data;
1119 }
1120 data[0][field].clone()
1121 } else {
1122 JsonValue::from(0)
1123 }
1124 }
1125
1126 fn having(&mut self, expr: &str) -> &mut Self {
1127 self.params.having.push(expr.to_string());
1128 self
1129 }
1130
1131 fn select(&mut self) -> JsonValue {
1132 let sql = self.params.select_sql();
1133 if self.params.sql {
1134 return JsonValue::from(sql.clone());
1135 }
1136 let (state, mut data) = self.query(sql.as_str());
1137 match state {
1138 true => {
1139 for (field, _) in self.params.json.entries() {
1140 for item in data.members_mut() {
1141 if !item[field].is_empty() {
1142 let json = item[field].to_string();
1143 item[field] = match json::parse(&json) {
1144 Ok(e) => e,
1145 Err(_) => JsonValue::from(json),
1146 };
1147 }
1148 }
1149 }
1150 data.clone()
1151 }
1152 false => array![],
1153 }
1154 }
1155
1156 fn find(&mut self) -> JsonValue {
1157 self.params.page = 1;
1158 self.params.limit = 1;
1159 let sql = self.params.select_sql();
1160 if self.params.sql {
1161 return JsonValue::from(sql.clone());
1162 }
1163 let (state, mut data) = self.query(sql.as_str());
1164 match state {
1165 true => {
1166 if data.is_empty() {
1167 return object! {};
1168 }
1169 for (field, _) in self.params.json.entries() {
1170 if !data[0][field].is_empty() {
1171 let json = data[0][field].to_string();
1172 let json = json::parse(&json).unwrap_or(array![]);
1173 data[0][field] = json;
1174 } else {
1175 data[0][field] = array![];
1176 }
1177 }
1178 data[0].clone()
1179 }
1180 false => {
1181 object! {}
1182 }
1183 }
1184 }
1185
1186 fn value(&mut self, field: &str) -> JsonValue {
1187 self.params.fields = object! {};
1188 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1189 self.params.page = 1;
1190 self.params.limit = 1;
1191 let sql = self.params.select_sql();
1192 if self.params.sql {
1193 return JsonValue::from(sql.clone());
1194 }
1195 let (state, mut data) = self.query(sql.as_str());
1196 match state {
1197 true => {
1198 for (field, _) in self.params.json.entries() {
1199 if !data[0][field].is_empty() {
1200 let json = data[0][field].to_string();
1201 let json = json::parse(&json).unwrap_or(array![]);
1202 data[0][field] = json;
1203 } else {
1204 data[0][field] = array![];
1205 }
1206 }
1207 data[0][field].clone()
1208 }
1209 false => {
1210 if self.connection.debug {
1211 info!("{data:?}");
1212 }
1213 JsonValue::Null
1214 }
1215 }
1216 }
1217
1218 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1219 let fields_list = self.table_info(&self.params.table.clone());
1220 let mut fields = vec![];
1221 let mut values = vec![];
1222 if !self.params.autoinc && data["id"].is_empty() {
1223 let thread_id = format!("{:?}", std::thread::current().id());
1224 let thread_num: u64 = thread_id
1225 .trim_start_matches("ThreadId(")
1226 .trim_end_matches(")")
1227 .parse()
1228 .unwrap_or(0);
1229 data["id"] = format!(
1230 "{:X}{:X}",
1231 Local::now().timestamp_nanos_opt().unwrap_or(0),
1232 thread_num
1233 )
1234 .into();
1235 }
1236 for (field, value) in data.entries() {
1237 fields.push(format!("\"{}\"", field));
1238
1239 if value.is_string() {
1240 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1241 continue;
1242 } else if value.is_array() {
1243 if self.params.json[field].is_empty() {
1244 let array = value
1245 .members()
1246 .map(|x| x.as_str().unwrap_or(""))
1247 .collect::<Vec<&str>>()
1248 .join(",");
1249 values.push(format!("'{array}'"));
1250 } else {
1251 let json = value.to_string();
1252 let json = json.replace("'", "''");
1253 values.push(format!("'{json}'"));
1254 }
1255 continue;
1256 } else if value.is_object() {
1257 if self.params.json[field].is_empty() {
1258 values.push(format!("'{value}'"));
1259 } else {
1260 let json = value.to_string();
1261 let json = json.replace("'", "''");
1262 values.push(format!("'{json}'"));
1263 }
1264 continue;
1265 } else if value.is_number() {
1266 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1267 if col_type == "boolean" {
1268 let bool_val = value.as_i64().unwrap_or(0) != 0;
1269 values.push(format!("{bool_val}"));
1270 } else if col_type.contains("int") {
1271 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1272 } else {
1273 values.push(format!("{value}"));
1274 }
1275 continue;
1276 } else if value.is_boolean() || value.is_null() {
1277 values.push(format!("{value}"));
1278 continue;
1279 } else {
1280 values.push(format!("'{value}'"));
1281 continue;
1282 }
1283 }
1284 let fields = fields.join(",");
1285 let values = values.join(",");
1286
1287 let sql = format!(
1288 "INSERT INTO {} ({}) VALUES ({});",
1289 self.params.table, fields, values
1290 );
1291 if self.params.sql {
1292 return JsonValue::from(sql.clone());
1293 }
1294 let (state, ids) = self.execute(sql.as_str());
1295
1296 match state {
1297 true => match self.params.autoinc {
1298 true => ids.clone(),
1299 false => data["id"].clone(),
1300 },
1301 false => {
1302 let thread_id = format!("{:?}", thread::current().id());
1303 error!("添加失败: {thread_id} {ids:?} {sql}");
1304 JsonValue::from("")
1305 }
1306 }
1307 }
1308 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1309 let fields_list = self.table_info(&self.params.table.clone());
1310 let mut fields = String::new();
1311 if !self.params.autoinc && data[0]["id"].is_empty() {
1312 data[0]["id"] = "".into();
1313 }
1314 for (field, _) in data[0].entries() {
1315 fields = format!("{fields},\"{field}\"");
1316 }
1317 fields = fields.trim_start_matches(",").to_string();
1318
1319 let core_count = num_cpus::get();
1320 let mut p = pools::Pool::new(core_count * 4);
1321
1322 let autoinc = self.params.autoinc;
1323 for list in data.members() {
1324 let mut item = list.clone();
1325 let i = br_fields::str::Code::verification_code(3);
1326 let fields_list_new = fields_list.clone();
1327 p.execute(move |pcindex| {
1328 if !autoinc && item["id"].is_empty() {
1329 let id = format!(
1330 "{:X}{:X}{}",
1331 Local::now().timestamp_nanos_opt().unwrap_or(0),
1332 pcindex,
1333 i
1334 );
1335 item["id"] = id.into();
1336 }
1337 let mut values = "".to_string();
1338 for (field, value) in item.entries() {
1339 if value.is_string() {
1340 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1341 } else if value.is_number() {
1342 let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1343 if col_type == "boolean" {
1344 let bool_val = value.as_i64().unwrap_or(0) != 0;
1345 values = format!("{values},{bool_val}");
1346 } else if col_type.contains("int") {
1347 values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1348 } else {
1349 values = format!("{values},{value}");
1350 }
1351 } else if value.is_boolean() {
1352 values = format!("{values},{value}");
1353 continue;
1354 } else {
1355 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1356 }
1357 }
1358 values = format!("({})", values.trim_start_matches(","));
1359 array![item["id"].clone(), values]
1360 });
1361 }
1362 let (ids_list, mut values) = p.insert_all();
1363 values = values.trim_start_matches(",").to_string();
1364 let sql = format!(
1365 "INSERT INTO {} ({}) VALUES {};",
1366 self.params.table, fields, values
1367 );
1368
1369 if self.params.sql {
1370 return JsonValue::from(sql.clone());
1371 }
1372 let (state, data) = self.execute(sql.as_str());
1373 match state {
1374 true => match autoinc {
1375 true => data,
1376 false => JsonValue::from(ids_list),
1377 },
1378 false => {
1379 error!("insert_all: {data:?}");
1380 array![]
1381 }
1382 }
1383 }
1384 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1385 let fields_list = self.table_info(&self.params.table.clone());
1386 let mut fields = vec![];
1387 let mut values = vec![];
1388 if !self.params.autoinc && data["id"].is_empty() {
1389 let thread_id = format!("{:?}", std::thread::current().id());
1390 let thread_num: u64 = thread_id
1391 .trim_start_matches("ThreadId(")
1392 .trim_end_matches(")")
1393 .parse()
1394 .unwrap_or(0);
1395 data["id"] = format!(
1396 "{:X}{:X}",
1397 Local::now().timestamp_nanos_opt().unwrap_or(0),
1398 thread_num
1399 )
1400 .into();
1401 }
1402 for (field, value) in data.entries() {
1403 fields.push(format!("\"{}\"", field));
1404
1405 if value.is_string() {
1406 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1407 continue;
1408 } else if value.is_array() {
1409 if self.params.json[field].is_empty() {
1410 let array = value
1411 .members()
1412 .map(|x| x.as_str().unwrap_or(""))
1413 .collect::<Vec<&str>>()
1414 .join(",");
1415 values.push(format!("'{array}'"));
1416 } else {
1417 let json = value.to_string();
1418 let json = json.replace("'", "''");
1419 values.push(format!("'{json}'"));
1420 }
1421 continue;
1422 } else if value.is_object() {
1423 if self.params.json[field].is_empty() {
1424 values.push(format!("'{value}'"));
1425 } else {
1426 let json = value.to_string();
1427 let json = json.replace("'", "''");
1428 values.push(format!("'{json}'"));
1429 }
1430 continue;
1431 } else if value.is_number() {
1432 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1433 if col_type == "boolean" {
1434 let bool_val = value.as_i64().unwrap_or(0) != 0;
1435 values.push(format!("{bool_val}"));
1436 } else if col_type.contains("int") {
1437 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1438 } else {
1439 values.push(format!("{value}"));
1440 }
1441 continue;
1442 } else if value.is_boolean() || value.is_null() {
1443 values.push(format!("{value}"));
1444 continue;
1445 } else {
1446 values.push(format!("'{value}'"));
1447 continue;
1448 }
1449 }
1450
1451 let conflict_cols: Vec<String> = conflict_fields
1452 .iter()
1453 .map(|f| format!("\"{}\"", f))
1454 .collect();
1455
1456 let update_set: Vec<String> = fields
1457 .iter()
1458 .filter(|f| {
1459 let name = f.trim_matches('"');
1460 !conflict_fields.contains(&name) && name != "id"
1461 })
1462 .map(|f| format!("{f}=EXCLUDED.{f}"))
1463 .collect();
1464
1465 let fields_str = fields.join(",");
1466 let values_str = values.join(",");
1467
1468 let sql = format!(
1469 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1470 self.params.table,
1471 fields_str,
1472 values_str,
1473 conflict_cols.join(","),
1474 update_set.join(",")
1475 );
1476 if self.params.sql {
1477 return JsonValue::from(sql.clone());
1478 }
1479 let (state, result) = self.execute(sql.as_str());
1480 match state {
1481 true => match self.params.autoinc {
1482 true => result.clone(),
1483 false => data["id"].clone(),
1484 },
1485 false => {
1486 let thread_id = format!("{:?}", thread::current().id());
1487 error!("upsert失败: {thread_id} {result:?} {sql}");
1488 JsonValue::from("")
1489 }
1490 }
1491 }
1492 fn update(&mut self, data: JsonValue) -> JsonValue {
1493 let fields_list = self.table_info(&self.params.table.clone());
1494 let mut values = vec![];
1495 for (field, value) in data.entries() {
1496 if value.is_string() {
1497 values.push(format!(
1498 "\"{}\"='{}'",
1499 field,
1500 value.to_string().replace("'", "''")
1501 ));
1502 } else if value.is_number() {
1503 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1504 if col_type == "boolean" {
1505 let bool_val = value.as_i64().unwrap_or(0) != 0;
1506 values.push(format!("\"{field}\"= {bool_val}"));
1507 } else if col_type.contains("int") {
1508 values.push(format!(
1509 "\"{}\"= {}",
1510 field,
1511 value.as_f64().unwrap_or(0.0) as i64
1512 ));
1513 } else {
1514 values.push(format!("\"{field}\"= {value}"));
1515 }
1516 } else if value.is_array() {
1517 if self.params.json[field].is_empty() {
1518 let array = value
1519 .members()
1520 .map(|x| x.as_str().unwrap_or(""))
1521 .collect::<Vec<&str>>()
1522 .join(",");
1523 values.push(format!("\"{field}\"='{array}'"));
1524 } else {
1525 let json = value.to_string();
1526 let json = json.replace("'", "''");
1527 values.push(format!("\"{field}\"='{json}'"));
1528 }
1529 continue;
1530 } else if value.is_object() {
1531 if self.params.json[field].is_empty() {
1532 values.push(format!("\"{field}\"='{value}'"));
1533 } else {
1534 if value.is_empty() {
1535 values.push(format!("\"{field}\"=''"));
1536 continue;
1537 }
1538 let json = value.to_string();
1539 let json = json.replace("'", "''");
1540 values.push(format!("\"{field}\"='{json}'"));
1541 }
1542 continue;
1543 } else if value.is_boolean() {
1544 values.push(format!("\"{field}\"= {value}"));
1545 } else {
1546 values.push(format!("\"{field}\"=\"{value}\""));
1547 }
1548 }
1549
1550 for (field, value) in self.params.inc_dec.entries() {
1551 values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1552 }
1553 if !self.params.update_column.is_empty() {
1554 values.extend(self.params.update_column.clone());
1555 }
1556 let values = values.join(",");
1557
1558 let sql = format!(
1559 "UPDATE {} SET {} {};",
1560 self.params.table.clone(),
1561 values,
1562 self.params.where_sql()
1563 );
1564 if self.params.sql {
1565 return JsonValue::from(sql.clone());
1566 }
1567 let (state, data) = self.execute(sql.as_str());
1568 if state {
1569 data
1570 } else {
1571 let thread_id = format!("{:?}", thread::current().id());
1572 error!("update: {thread_id} {data:?} {sql}");
1573 0.into()
1574 }
1575 }
1576 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1577 let fields_list = self.table_info(&self.params.table.clone());
1578 let mut values = vec![];
1579
1580 let mut ids = vec![];
1581 for (field, _) in data[0].entries() {
1582 if field == "id" {
1583 continue;
1584 }
1585 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1586 let mut fields = vec![];
1587 for row in data.members() {
1588 let value = row[field].clone();
1589 let id = row["id"].clone();
1590 ids.push(id.clone());
1591 if value.is_string() {
1592 fields.push(format!(
1593 "WHEN '{}' THEN '{}'",
1594 id,
1595 value.to_string().replace("'", "''")
1596 ));
1597 } else if value.is_array() || value.is_object() {
1598 if self.params.json[field].is_empty() {
1599 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1600 } else {
1601 let json = value.to_string();
1602 let json = json.replace("'", "''");
1603 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1604 }
1605 continue;
1606 } else if value.is_number() {
1607 if col_type == "boolean" {
1608 let bool_val = value.as_i64().unwrap_or(0) != 0;
1609 fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1610 } else {
1611 fields.push(format!("WHEN '{id}' THEN {value}"));
1612 }
1613 } else if value.is_boolean() || value.is_null() {
1614 fields.push(format!("WHEN '{id}' THEN {value}"));
1615 } else {
1616 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1617 }
1618 }
1619 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1620 }
1621 self.where_and("id", "in", ids.into());
1622 for (field, value) in self.params.inc_dec.entries() {
1623 values.push(format!("{} = {}", field, value.to_string().clone()));
1624 }
1625
1626 let values = values.join(",");
1627 let sql = format!(
1628 "UPDATE {} SET {} {} {};",
1629 self.params.table.clone(),
1630 values,
1631 self.params.where_sql(),
1632 self.params.page_limit_sql()
1633 );
1634 if self.params.sql {
1635 return JsonValue::from(sql.clone());
1636 }
1637 let (state, data) = self.execute(sql.as_str());
1638 if state {
1639 data
1640 } else {
1641 error!("update_all: {data:?}");
1642 JsonValue::from(0)
1643 }
1644 }
1645
1646 fn delete(&mut self) -> JsonValue {
1647 let sql = format!(
1648 "delete FROM {} {} {};",
1649 self.params.table.clone(),
1650 self.params.where_sql(),
1651 self.params.page_limit_sql()
1652 );
1653 if self.params.sql {
1654 return JsonValue::from(sql.clone());
1655 }
1656 let (state, data) = self.execute(sql.as_str());
1657 match state {
1658 true => data,
1659 false => {
1660 error!("delete 失败>>> {data:?}");
1661 JsonValue::from(0)
1662 }
1663 }
1664 }
1665
1666 fn transaction(&mut self) -> bool {
1667 let thread_id = format!("{:?}", thread::current().id());
1668 let key = format!("{}{}", self.default, thread_id);
1669
1670 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1671 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1672 PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1673 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1674 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1675 return true;
1676 }
1677
1678 let mut conn = match self.client.get_connect_for_transaction() {
1679 Ok(c) => c,
1680 Err(e) => {
1681 error!("获取事务连接失败: {e}");
1682 return false;
1683 }
1684 };
1685
1686 if !conn.is_valid() {
1687 error!("事务连接无效");
1688 self.client.release_transaction_conn();
1689 return false;
1690 }
1691
1692 if let Err(e) = conn.execute("START TRANSACTION") {
1693 error!("启动事务失败: {e}");
1694 self.client.release_transaction_conn();
1695 return false;
1696 }
1697
1698 if let Err(e) = conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") {
1699 error!("设置事务隔离级别失败: {e}");
1700 let _ = conn.execute("ROLLBACK");
1701 self.client.release_transaction_conn();
1702 return false;
1703 }
1704
1705 PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1706 true
1707 }
1708 fn commit(&mut self) -> bool {
1709 let thread_id = format!("{:?}", thread::current().id());
1710 let key = format!("{}{}", self.default, thread_id);
1711
1712 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1713 error!("commit: 没有活跃的事务");
1714 return false;
1715 }
1716
1717 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1718 if depth > 1 {
1719 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1720 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1721 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1722 return true;
1723 }
1724
1725 let commit_result =
1726 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1727
1728 let success = match commit_result {
1729 Some(Ok(_)) => true,
1730 Some(Err(e)) => {
1731 error!("提交事务失败: {e}");
1732 false
1733 }
1734 None => {
1735 error!("提交事务失败: 未找到连接");
1736 false
1737 }
1738 };
1739
1740 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1741 self.client.release_transaction_conn();
1742 success
1743 }
1744
1745 fn rollback(&mut self) -> bool {
1746 let thread_id = format!("{:?}", thread::current().id());
1747 let key = format!("{}{}", self.default, thread_id);
1748
1749 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1750 error!("rollback: 没有活跃的事务");
1751 return false;
1752 }
1753
1754 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1755 if depth > 1 {
1756 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1757 let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1758 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1759 return true;
1760 }
1761
1762 let rollback_result =
1763 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1764
1765 let success = match rollback_result {
1766 Some(Ok(_)) => true,
1767 Some(Err(e)) => {
1768 error!("回滚失败: {e}");
1769 false
1770 }
1771 None => {
1772 error!("回滚失败: 未找到连接");
1773 false
1774 }
1775 };
1776
1777 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1778 self.client.release_transaction_conn();
1779 success
1780 }
1781
1782 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1783 let (state, data) = self.query(sql);
1784 match state {
1785 true => Ok(data),
1786 false => Err(data.to_string()),
1787 }
1788 }
1789
1790 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1791 let (state, data) = self.execute(sql);
1792 match state {
1793 true => Ok(data),
1794 false => Err(data.to_string()),
1795 }
1796 }
1797
1798 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1799 self.params.inc_dec[field] = format!("{field} + {num}").into();
1800 self
1801 }
1802
1803 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1804 self.params.inc_dec[field] = format!("{field} - {num}").into();
1805 self
1806 }
1807 fn buildsql(&mut self) -> String {
1808 self.fetch_sql();
1809 let sql = self.select().to_string();
1810 format!("( {} ) {}", sql, self.params.table)
1811 }
1812
1813 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1814 for field in fields {
1815 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1816 }
1817 self
1818 }
1819
1820 fn join(
1821 &mut self,
1822 main_table: &str,
1823 main_fields: &str,
1824 right_table: &str,
1825 right_fields: &str,
1826 ) -> &mut Self {
1827 let main_table = if main_table.is_empty() {
1828 self.params.table.clone()
1829 } else {
1830 main_table.to_string()
1831 };
1832 self.params.join_table = right_table.to_string();
1833 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1834 self
1835 }
1836
1837 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1838 let main_fields = if main_fields.is_empty() {
1839 "id"
1840 } else {
1841 main_fields
1842 };
1843 let second_fields = if second_fields.is_empty() {
1844 self.params.table.clone()
1845 } else {
1846 second_fields.to_string().clone()
1847 };
1848 let sec_table_name = format!("{}{}", table, "_2");
1849 let second_table = format!("{} {}", table, sec_table_name.clone());
1850 self.params.join_table = sec_table_name.clone();
1851 self.params.join.push(format!(
1852 " INNER JOIN {} ON {}.{} = {}.{}",
1853 second_table, self.params.table, main_fields, sec_table_name, second_fields
1854 ));
1855 self
1856 }
1857
1858 fn join_right(
1859 &mut self,
1860 main_table: &str,
1861 main_fields: &str,
1862 right_table: &str,
1863 right_fields: &str,
1864 ) -> &mut Self {
1865 let main_table = if main_table.is_empty() {
1866 self.params.table.clone()
1867 } else {
1868 main_table.to_string()
1869 };
1870 self.params.join_table = right_table.to_string();
1871 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1872 self
1873 }
1874
1875 fn join_full(
1876 &mut self,
1877 main_table: &str,
1878 main_fields: &str,
1879 right_table: &str,
1880 right_fields: &str,
1881 ) -> &mut Self {
1882 let main_table = if main_table.is_empty() {
1883 self.params.table.clone()
1884 } else {
1885 main_table.to_string()
1886 };
1887 self.params.join_table = right_table.to_string();
1888 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1889 self
1890 }
1891
1892 fn union(&mut self, sub_sql: &str) -> &mut Self {
1893 self.params.unions.push(format!("UNION {sub_sql}"));
1894 self
1895 }
1896
1897 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1898 self.params.unions.push(format!("UNION ALL {sub_sql}"));
1899 self
1900 }
1901
1902 fn lock_for_update(&mut self) -> &mut Self {
1903 self.params.lock_mode = "FOR UPDATE".to_string();
1904 self
1905 }
1906
1907 fn lock_for_share(&mut self) -> &mut Self {
1908 self.params.lock_mode = "FOR SHARE".to_string();
1909 self
1910 }
1911}