1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use chrono::Local;
7use json::{array, object, JsonValue};
8use log::{error, info};
9use std::thread;
10#[derive(Clone)]
11pub struct Pgsql {
12 pub connection: Connection,
14 pub default: String,
16 pub params: Params,
17 pub client: Pools,
18}
19
20impl Pgsql {
21 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
22 let port = connection
23 .hostport
24 .parse::<i32>()
25 .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
26
27 let cp_connection = connection.clone();
28 let config = object! {
29 debug: cp_connection.debug,
30 username: cp_connection.username,
31 userpass: cp_connection.userpass,
32 database: cp_connection.database,
33 hostname: cp_connection.hostname,
34 hostport: port,
35 charset: cp_connection.charset.str(),
36 };
37 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
38
39 let pools = pgsql.pools()?;
40 Ok(Self {
41 connection,
42 default: default.clone(),
43 params: Params::default("pgsql"),
44 client: pools,
45 })
46 }
47
48 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
49 let thread_id = format!("{:?}", thread::current().id());
50 let key = format!("{}{}", self.default, thread_id);
51
52 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
53 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
54
55 match result {
56 Some(Ok(e)) => {
57 if self.connection.debug {
58 info!("查询成功: {} {}", thread_id.clone(), sql);
59 }
60 (true, e.rows)
61 }
62 Some(Err(e)) => {
63 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
64 (false, JsonValue::from(e.to_string()))
65 }
66 None => {
67 error!("事务查询失败: 未找到事务连接 {thread_id}");
68 (false, JsonValue::from("未找到事务连接"))
69 }
70 }
71 } else {
72 let mut guard = match self.client.get_guard() {
73 Ok(g) => g,
74 Err(e) => {
75 error!(
76 "非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
77 );
78 return (false, JsonValue::from(e.to_string()));
79 }
80 };
81
82 let res = guard.conn().query(sql);
83 match res {
84 Ok(e) => {
85 if self.connection.debug {
86 info!("查询成功: {} {}", thread_id.clone(), sql);
87 }
88 (true, e.rows)
89 }
90 Err(e) => {
91 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
92 (false, JsonValue::from(e.to_string()))
93 }
94 }
95 }
96 }
97 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
98 let thread_id = format!("{:?}", thread::current().id());
99 let key = format!("{}{}", self.default, thread_id);
100
101 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
102 let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
103
104 match result {
105 Some(Ok(e)) => {
106 if self.connection.debug {
107 info!("提交成功: {} {}", thread_id.clone(), sql);
108 }
109 if sql.contains("INSERT") {
110 (true, e.rows)
111 } else {
112 (true, e.affect_count.into())
113 }
114 }
115 Some(Err(e)) => {
116 error!("事务提交失败: {thread_id} {e} {sql}");
117 (false, JsonValue::from(e.to_string()))
118 }
119 None => {
120 error!("事务执行失败: 未找到事务连接 {thread_id}");
121 (false, JsonValue::from("未找到事务连接"))
122 }
123 }
124 } else {
125 let mut guard = match self.client.get_guard() {
126 Ok(g) => g,
127 Err(e) => {
128 error!(
129 "非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
130 );
131 return (false, JsonValue::from(e.to_string()));
132 }
133 };
134
135 let res = guard.conn().execute(sql);
136 match res {
137 Ok(e) => {
138 if self.connection.debug {
139 info!("提交成功: {} {}", thread_id.clone(), sql);
140 }
141 if sql.contains("INSERT") {
142 (true, e.rows)
143 } else {
144 (true, e.affect_count.into())
145 }
146 }
147 Err(e) => {
148 error!("非事务提交失败: {thread_id} {e} {sql}");
149 (false, JsonValue::from(e.to_string()))
150 }
151 }
152 }
153 }
154}
155
156impl DbMode for Pgsql {
157 fn database_tables(&mut self) -> JsonValue {
158 let sql = "SHOW TABLES".to_string();
159 match self.sql(sql.as_str()) {
160 Ok(e) => {
161 let mut list = vec![];
162 for item in e.members() {
163 for (_, value) in item.entries() {
164 list.push(value.clone());
165 }
166 }
167 list.into()
168 }
169 Err(_) => {
170 array![]
171 }
172 }
173 }
174
175 fn database_create(&mut self, name: &str) -> bool {
176 let sql = format!("CREATE DATABASE {name}");
177
178 let (state, data) = self.execute(sql.as_str());
179 match state {
180 true => data.as_bool().unwrap_or(true),
181 false => {
182 error!("创建数据库失败: {data:?}");
183 false
184 }
185 }
186 }
187}
188
189impl Mode for Pgsql {
190 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
191 let mut sql = String::new();
192 let mut comments = vec![];
193
194 if !options.table_unique.is_empty() {
195 let full_name = format!(
196 "{}_unique_{}",
197 options.table_name,
198 options.table_unique.join("_")
199 );
200 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
201 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
202 let unique = format!(
203 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
204 name,
205 options.table_name,
206 options.table_unique.join(",")
207 );
208 comments.push(unique);
209 }
210
211 for row in options.table_index.iter() {
212 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
213 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
214 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
215 let index = format!(
216 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
217 name,
218 options.table_name,
219 row.join(",")
220 );
221 comments.push(index);
222 }
223
224 for (name, field) in options.table_fields.entries_mut() {
225 field["table_name"] = options.table_name.clone().into();
226 let row = br_fields::field("pgsql", name, field.clone());
227 let rows = row.split("comment").collect::<Vec<&str>>();
228 if rows.len() > 1 {
229 comments.push(format!(
230 "COMMENT ON COLUMN {}.\"{}\" IS {};",
231 options.table_name, name, rows[1]
232 ));
233 }
234 sql = format!("{} {},\r\n", sql, rows[0]);
235 }
236
237 let primary_key = format!(
238 "CONSTRAINT {}_{} PRIMARY KEY ({})",
239 options.table_name, options.table_key, options.table_key
240 );
241 let sql = format!(
242 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
243 options.table_name,
244 sql.trim_end_matches(",\r\n"),
245 primary_key
246 );
247 comments.insert(0, sql);
248
249 for (_name, field) in options.table_fields.entries() {
250 let _ = field["mode"].as_str();
251 }
252
253 if self.params.sql {
254 let info = comments.join("\r\n");
255 return JsonValue::from(info);
256 }
257 for comment in comments {
258 let (state, _) = self.execute(comment.as_str());
259 match state {
260 true => {}
261 false => {
262 return JsonValue::from(state);
263 }
264 }
265 }
266 JsonValue::from(true)
267 }
268
269 fn table_update(&mut self, options: TableOptions) -> JsonValue {
270 let fields_list = self.table_info(&options.table_name);
271 let mut put = vec![];
272 let mut add = vec![];
273 let mut del = vec![];
274 let mut comments = vec![];
275
276 for (key, _) in fields_list.entries() {
277 if options.table_fields[key].is_empty() {
278 del.push(key);
279 }
280 }
281 for (name, field) in options.table_fields.entries() {
282 if !fields_list[name].is_empty() {
283 let old_info = &fields_list[name];
284 let new_field_sql = br_fields::field("pgsql", name, field.clone());
285
286 let old_comment = old_info["comment"].as_str().unwrap_or("");
287 let old_type = old_info["type"].as_str().unwrap_or("");
288
289 let new_comment = if let Some(idx) = new_field_sql.find("--") {
290 new_field_sql[idx + 2..].trim()
291 } else {
292 ""
293 };
294
295 let comment_matches =
296 if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
297 let old_parts: Vec<&str> = old_comment.split('|').collect();
298 let new_parts: Vec<&str> = new_comment.split('|').collect();
299 if old_parts.len() >= 4 && new_parts.len() >= 4 {
300 old_parts[..4] == new_parts[..4]
301 } else {
302 old_comment == new_comment
303 }
304 } else if !old_comment.is_empty() && !new_comment.is_empty() {
305 let old_parts: Vec<&str> = old_comment.split('|').collect();
306 let new_parts: Vec<&str> = new_comment.split('|').collect();
307 if old_parts.len() >= 2
308 && new_parts.len() >= 2
309 && old_parts.len() == new_parts.len()
310 {
311 let old_filtered: Vec<&str> = old_parts
312 .iter()
313 .filter(|v| **v != "true" && **v != "false")
314 .copied()
315 .collect();
316 let new_filtered: Vec<&str> = new_parts
317 .iter()
318 .filter(|v| **v != "true" && **v != "false")
319 .copied()
320 .collect();
321 old_filtered == new_filtered
322 } else {
323 old_comment == new_comment
324 }
325 } else {
326 old_comment == new_comment
327 };
328
329 let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
330 let new_type = if sql_parts.len() > 1 {
331 sql_parts[1].to_lowercase()
332 } else {
333 String::new()
334 };
335
336 let type_matches = match old_type {
337 "integer" => {
338 new_type.contains("int")
339 && !new_type.contains("bigint")
340 && !new_type.contains("smallint")
341 }
342 "bigint" => new_type.contains("bigint"),
343 "smallint" => new_type.contains("smallint"),
344 "boolean" => new_type.contains("boolean"),
345 "text" => new_type.contains("text"),
346 "character varying" => new_type.contains("varchar"),
347 "character" => new_type.contains("char") && !new_type.contains("varchar"),
348 "numeric" => new_type.contains("numeric") || new_type.contains("decimal"),
349 "double precision" => {
350 new_type.contains("double") || new_type.contains("float8")
351 }
352 "real" => new_type.contains("real") || new_type.contains("float4"),
353 "timestamp without time zone" | "timestamp with time zone" => {
354 new_type.contains("timestamp")
355 }
356 "date" => new_type.contains("date") && !new_type.contains("timestamp"),
357 "time without time zone" | "time with time zone" => {
358 new_type.contains("time") && !new_type.contains("timestamp")
359 }
360 "json" | "jsonb" => new_type.contains("json"),
361 "uuid" => new_type.contains("uuid"),
362 "bytea" => new_type.contains("bytea"),
363 _ => old_type == new_type,
364 };
365
366 if type_matches && comment_matches {
367 continue;
368 }
369
370 log::debug!(
371 "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
372 options.table_name,
373 name,
374 type_matches,
375 old_type,
376 new_type,
377 comment_matches
378 );
379 put.push(name);
380 } else {
381 add.push(name);
382 }
383 }
384
385 for name in add.iter() {
386 let name = name.to_string();
387 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
388 let rows = row.split("--").collect::<Vec<&str>>();
389 comments.push(format!(
390 r#"ALTER TABLE "{}" add {};"#,
391 options.table_name,
392 rows[0].trim()
393 ));
394 if rows.len() > 1 {
395 comments.push(format!(
396 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
397 options.table_name,
398 name,
399 rows[1].trim()
400 ));
401 }
402 }
403 for name in del.iter() {
404 comments.push(format!(
405 "ALTER TABLE {} DROP {};\r\n",
406 options.table_name, name
407 ));
408 }
409 for name in put.iter() {
410 let name = name.to_string();
411 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
412 let rows = row.split("--").collect::<Vec<&str>>();
413
414 let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
415
416 if sql[1].contains("BOOLEAN") {
417 let text = format!(
418 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
419 options.table_name, name
420 );
421 comments.push(text.clone());
422 let text = format!(
423 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
424 options.table_name, name, sql[1]
425 );
426 comments.push(text.clone());
427 } else {
428 let text = format!(
429 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
430 options.table_name, name, sql[1]
431 );
432 comments.push(text.clone());
433 };
434
435 if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
436 let default_value = rows[0][default_pos + 9..].trim();
437 if !default_value.is_empty() {
438 comments.push(format!(
439 "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
440 options.table_name, name, default_value
441 ));
442 }
443 }
444 let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
447 .as_str()
448 .unwrap_or("YES");
449 let old_is_required = old_is_nullable == "NO";
450
451 if old_is_required && name != options.table_key {
453 comments.push(format!(
454 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
455 options.table_name, name
456 ));
457 }
458
459 if rows.len() > 1 {
460 comments.push(format!(
461 "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
462 options.table_name,
463 name,
464 rows[1].trim()
465 ));
466 }
467 }
468
469 let mut unique_new = vec![];
470 let mut index_new = vec![];
471 let mut primary_key = vec![];
472 let (_, index_list) = self.query(
473 format!(
474 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
475 options.table_name
476 )
477 .as_str(),
478 );
479 for item in index_list.members() {
480 let key_name = item["indexname"].as_str().unwrap_or("");
481 let indexdef = item["indexdef"].to_string();
482
483 if indexdef.contains(
484 format!(
485 "CREATE UNIQUE INDEX {}_{} ON",
486 options.table_name, options.table_key
487 )
488 .as_str(),
489 ) {
490 primary_key.push(key_name.to_string());
491 continue;
492 }
493 if indexdef.contains("CREATE UNIQUE INDEX") {
494 unique_new.push(key_name.to_string());
495 continue;
496 }
497 if indexdef.contains("CREATE INDEX") {
498 index_new.push(key_name.to_string());
499 continue;
500 }
501 }
502
503 if !options.table_unique.is_empty() {
504 let full_name = format!(
505 "{}_unique_{}",
506 options.table_name,
507 options.table_unique.join("_")
508 );
509 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
510 let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
511 let unique = format!(
512 "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
513 name,
514 options.table_name,
515 options.table_unique.join(",")
516 );
517 if !unique_new.contains(&name) {
518 comments.push(unique);
519 }
520 unique_new.retain(|x| *x != name);
521 }
522
523 for row in options.table_index.iter() {
524 let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
525 let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
526 let name = format!("{}_index_{}", options.table_name, &md5[..16]);
527 let index = format!(
528 "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
529 name,
530 options.table_name,
531 row.join(",")
532 );
533 if !index_new.contains(&name) {
534 comments.push(index);
535 }
536 index_new.retain(|x| *x != name);
537 }
538
539 for item in unique_new {
540 if item.ends_with("_pkey") {
541 continue;
542 }
543 if item.starts_with("unique_") {
544 comments.push(format!(
545 "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
546 options.table_name,
547 item.clone()
548 ));
549 } else {
550 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
551 }
552 }
553 for item in index_new {
554 if item.ends_with("_pkey") {
555 continue;
556 }
557 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
558 }
559
560 if self.params.sql {
561 return JsonValue::from(comments.join(""));
562 }
563
564 if comments.is_empty() {
565 return JsonValue::from(-1);
566 }
567
568 for item in comments.iter() {
569 let (state, res) = self.execute(item.as_str());
570 match state {
571 true => {}
572 false => {
573 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
574 return JsonValue::from(0);
575 }
576 }
577 }
578 JsonValue::from(1)
579 }
580
581 fn table_info(&mut self, table: &str) -> JsonValue {
582 let sql = format!(
583 "SELECT COL.COLUMN_NAME,
584 COL.DATA_TYPE,
585 COL.IS_NULLABLE,
586 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
587 LEFT JOIN
588 pg_catalog.pg_description DESCRIPTION
589 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
590 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
591 let (state, data) = self.query(sql.as_str());
592 let mut list = object! {};
593 if state {
594 for item in data.members() {
595 let mut row = object! {};
596 row["field"] = item["column_name"].clone();
597 row["comment"] = item["comment"].clone();
598 row["type"] = item["data_type"].clone();
599 row["is_nullable"] = item["is_nullable"].clone();
600 if let Some(field_name) = row["field"].as_str() {
601 list[field_name] = row.clone();
602 }
603 }
604 list
605 } else {
606 list
607 }
608 }
609
610 fn table_is_exist(&mut self, name: &str) -> bool {
611 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
612 let (state, data) = self.query(sql.as_str());
613 match state {
614 true => {
615 for item in data.members() {
616 if item.has_key("exists") {
617 return item["exists"].as_bool().unwrap_or(false);
618 }
619 }
620 false
621 }
622 false => false,
623 }
624 }
625
626 fn table(&mut self, name: &str) -> &mut Pgsql {
627 self.params = Params::default(self.connection.mode.str().as_str());
628 let table_name = format!("{}{}", self.connection.prefix, name);
629 if !super::sql_safety::validate_table_name(&table_name) {
630 error!("Invalid table name: {}", name);
631 }
632 self.params.table = table_name.clone();
633 self.params.join_table = table_name;
634 self
635 }
636
637 fn change_table(&mut self, name: &str) -> &mut Self {
638 self.params.join_table = name.to_string();
639 self
640 }
641
642 fn autoinc(&mut self) -> &mut Self {
643 self.params.autoinc = true;
644 self
645 }
646
647 fn fetch_sql(&mut self) -> &mut Self {
648 self.params.sql = true;
649 self
650 }
651
652 fn order(&mut self, field: &str, by: bool) -> &mut Self {
653 self.params.order[field] = {
654 if by {
655 "DESC"
656 } else {
657 "ASC"
658 }
659 }
660 .into();
661 self
662 }
663
664 fn group(&mut self, field: &str) -> &mut Self {
665 let fields: Vec<&str> = field.split(",").collect();
666 for field in fields.iter() {
667 let field = field.to_string();
668 self.params.group[field.as_str()] = field.clone().into();
669 self.params.fields[field.as_str()] = field.clone().into();
670 }
671 self
672 }
673
674 fn distinct(&mut self) -> &mut Self {
675 self.params.distinct = true;
676 self
677 }
678
679 fn json(&mut self, field: &str) -> &mut Self {
680 let list: Vec<&str> = field.split(",").collect();
681 for item in list.iter() {
682 self.params.json[item.to_string().as_str()] = item.to_string().into();
683 }
684 self
685 }
686
687 fn location(&mut self, field: &str) -> &mut Self {
688 let list: Vec<&str> = field.split(",").collect();
689 for item in list.iter() {
690 self.params.location[item.to_string().as_str()] = item.to_string().into();
691 }
692 self
693 }
694
695 fn field(&mut self, field: &str) -> &mut Self {
696 let list: Vec<&str> = field.split(",").collect();
697 let join_table = if self.params.join_table.is_empty() {
698 self.params.table.clone()
699 } else {
700 self.params.join_table.clone()
701 };
702 for item in list.iter() {
703 if item.contains(" as ") {
704 let text = item.split(" as ").collect::<Vec<&str>>();
705 if text[0].contains("count(") {
706 self.params.fields[item.to_string().as_str()] =
707 format!("{} as {}", text[0], text[1]).into();
708 } else {
709 self.params.fields[item.to_string().as_str()] =
710 format!("{}.{} as {}", join_table, text[0], text[1]).into();
711 }
712 } else {
713 self.params.fields[item.to_string().as_str()] =
714 format!("{join_table}.{item}").into();
715 }
716 }
717 self
718 }
719
720 fn hidden(&mut self, name: &str) -> &mut Self {
721 let hidden: Vec<&str> = name.split(",").collect();
722
723 let fields_list = self.table_info(self.params.clone().table.as_str());
724 let mut data = array![];
725 for item in fields_list.members() {
726 let _ = data.push(object! {
727 "name":item["field"].as_str().unwrap_or("")
728 });
729 }
730
731 for item in data.members() {
732 let name = item["name"].as_str().unwrap_or("");
733 if !hidden.contains(&name) {
734 self.params.fields[name] = name.into();
735 }
736 }
737 self
738 }
739
740 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
741 let join_table = if self.params.join_table.is_empty() {
742 self.params.table.clone()
743 } else {
744 self.params.join_table.clone()
745 };
746 if value.is_boolean() {
747 if value.as_bool().unwrap_or(false) {
748 value = 1.into();
749 } else {
750 value = 0.into();
751 }
752 }
753 match compare {
754 "between" => {
755 self.params.where_and.push(format!(
756 "{}.{} between '{}' AND '{}'",
757 join_table, field, value[0], value[1]
758 ));
759 }
760 "set" => {
761 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
762 let mut wheredata = vec![];
763 for item in list.iter() {
764 wheredata.push(format!(
765 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
766 ));
767 }
768 self.params
769 .where_and
770 .push(format!("({})", wheredata.join(" or ")));
771 }
772 "notin" => {
773 let mut text = String::new();
774 for item in value.members() {
775 text = format!("{text},'{item}'");
776 }
777 text = text.trim_start_matches(",").into();
778 self.params
779 .where_and
780 .push(format!("{join_table}.{field} not in ({text})"));
781 }
782 "is" => {
783 self.params
784 .where_and
785 .push(format!("{join_table}.{field} is {value}"));
786 }
787 "isnot" => {
788 self.params
789 .where_and
790 .push(format!("{join_table}.{field} is not {value}"));
791 }
792 "notlike" => {
793 self.params
794 .where_and
795 .push(format!("{join_table}.{field} not like '{value}'"));
796 }
797 "in" => {
798 let mut text = String::new();
799 if value.is_array() {
800 for item in value.members() {
801 text = format!("{text},'{item}'");
802 }
803 } else if value.is_null() {
804 text = format!("{text},null");
805 } else {
806 let value = value.as_str().unwrap_or("");
807
808 let value: Vec<&str> = value.split(",").collect();
809 for item in value.iter() {
810 text = format!("{text},'{item}'");
811 }
812 }
813 text = text.trim_start_matches(",").into();
814
815 self.params
816 .where_and
817 .push(format!("{join_table}.{field} {compare} ({text})"));
818 }
819 _ => {
820 self.params
821 .where_and
822 .push(format!("{join_table}.{field} {compare} '{value}'"));
823 }
824 }
825 self
826 }
827
828 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
829 let join_table = if self.params.join_table.is_empty() {
830 self.params.table.clone()
831 } else {
832 self.params.join_table.clone()
833 };
834
835 if value.is_boolean() {
836 if value.as_bool().unwrap_or(false) {
837 value = 1.into();
838 } else {
839 value = 0.into();
840 }
841 }
842
843 match compare {
844 "between" => {
845 self.params.where_or.push(format!(
846 "{}.{} between '{}' AND '{}'",
847 join_table, field, value[0], value[1]
848 ));
849 }
850 "set" => {
851 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
852 let mut wheredata = vec![];
853 for item in list.iter() {
854 wheredata.push(format!(
855 "'{item}' = ANY (string_to_array({join_table}.{field},','))"
856 ));
857 }
858 self.params
859 .where_or
860 .push(format!("({})", wheredata.join(" or ")));
861 }
862 "notin" => {
863 let mut text = String::new();
864 for item in value.members() {
865 text = format!("{text},'{item}'");
866 }
867 text = text.trim_start_matches(",").into();
868 self.params
869 .where_or
870 .push(format!("{join_table}.{field} not in ({text})"));
871 }
872 "is" => {
873 self.params
874 .where_or
875 .push(format!("{join_table}.{field} is {value}"));
876 }
877 "isnot" => {
878 self.params
879 .where_or
880 .push(format!("{join_table}.{field} is not {value}"));
881 }
882 "in" => {
883 let mut text = String::new();
884 if value.is_array() {
885 for item in value.members() {
886 text = format!("{text},'{item}'");
887 }
888 } else {
889 let value = value.as_str().unwrap_or("");
890 let value: Vec<&str> = value.split(",").collect();
891 for item in value.iter() {
892 text = format!("{text},'{item}'");
893 }
894 }
895 text = text.trim_start_matches(",").into();
896 self.params
897 .where_or
898 .push(format!("{join_table}.{field} {compare} ({text})"));
899 }
900 _ => {
901 self.params
902 .where_or
903 .push(format!("{join_table}.{field} {compare} '{value}'"));
904 }
905 }
906 self
907 }
908
909 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
910 self.params.where_column = format!(
911 "{}.{} {} {}.{}",
912 self.params.table, field_a, compare, self.params.table, field_b
913 );
914 self
915 }
916
917 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
918 self.params
919 .update_column
920 .push(format!("{field_a} = {compare}"));
921 self
922 }
923
924 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
925 self.params.page = page;
926 self.params.limit = limit;
927 self
928 }
929
930 fn column(&mut self, field: &str) -> JsonValue {
931 self.field(field);
932 let sql = self.params.select_sql();
933
934 if self.params.sql {
935 return JsonValue::from(sql);
936 }
937 let (state, data) = self.query(sql.as_str());
938 match state {
939 true => {
940 let mut list = array![];
941 for item in data.members() {
942 if self.params.json[field].is_empty() {
943 let _ = list.push(item[field].clone());
944 } else {
945 let data =
946 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
947 let _ = list.push(data);
948 }
949 }
950 list
951 }
952 false => {
953 array![]
954 }
955 }
956 }
957
958 fn count(&mut self) -> JsonValue {
959 self.params.fields["count"] = "count(*) as count".to_string().into();
960 let sql = self.params.select_sql();
961 if self.params.sql {
962 return JsonValue::from(sql.clone());
963 }
964 let (state, data) = self.query(sql.as_str());
965 if state {
966 data[0]["count"].clone()
967 } else {
968 JsonValue::from(0)
969 }
970 }
971
972 fn max(&mut self, field: &str) -> JsonValue {
973 self.params.fields[field] = format!("max({field}) as {field}").into();
974 let sql = self.params.select_sql();
975 if self.params.sql {
976 return JsonValue::from(sql.clone());
977 }
978 let (state, data) = self.query(sql.as_str());
979 if state {
980 if data.len() > 1 {
981 return data.clone();
982 }
983 data[0][field].clone()
984 } else {
985 JsonValue::from(0)
986 }
987 }
988
989 fn min(&mut self, field: &str) -> JsonValue {
990 self.params.fields[field] = format!("min({field}) as {field}").into();
991 let sql = self.params.select_sql();
992 if self.params.sql {
993 return JsonValue::from(sql.clone());
994 }
995 let (state, data) = self.query(sql.as_str());
996 if state {
997 if data.len() > 1 {
998 return data;
999 }
1000 data[0][field].clone()
1001 } else {
1002 JsonValue::from(0)
1003 }
1004 }
1005
1006 fn sum(&mut self, field: &str) -> JsonValue {
1007 self.params.fields[field] = format!("sum({field}) as {field}").into();
1008 let sql = self.params.select_sql();
1009 if self.params.sql {
1010 return JsonValue::from(sql.clone());
1011 }
1012 let (state, data) = self.query(sql.as_str());
1013 match state {
1014 true => {
1015 if data.len() > 1 {
1016 return data;
1017 }
1018 data[0][field].clone()
1019 }
1020 false => JsonValue::from(0),
1021 }
1022 }
1023
1024 fn avg(&mut self, field: &str) -> JsonValue {
1025 self.params.fields[field] = format!("avg({field}) as {field}").into();
1026 let sql = self.params.select_sql();
1027 if self.params.sql {
1028 return JsonValue::from(sql.clone());
1029 }
1030 let (state, data) = self.query(sql.as_str());
1031 if state {
1032 if data.len() > 1 {
1033 return data;
1034 }
1035 data[0][field].clone()
1036 } else {
1037 JsonValue::from(0)
1038 }
1039 }
1040
1041 fn select(&mut self) -> JsonValue {
1042 let sql = self.params.select_sql();
1043 if self.params.sql {
1044 return JsonValue::from(sql.clone());
1045 }
1046 let (state, mut data) = self.query(sql.as_str());
1047 match state {
1048 true => {
1049 for (field, _) in self.params.json.entries() {
1050 for item in data.members_mut() {
1051 if !item[field].is_empty() {
1052 let json = item[field].to_string();
1053 item[field] = match json::parse(&json) {
1054 Ok(e) => e,
1055 Err(_) => JsonValue::from(json),
1056 };
1057 }
1058 }
1059 }
1060 data.clone()
1061 }
1062 false => array![],
1063 }
1064 }
1065
1066 fn find(&mut self) -> JsonValue {
1067 self.params.page = 1;
1068 self.params.limit = 1;
1069 let sql = self.params.select_sql();
1070 if self.params.sql {
1071 return JsonValue::from(sql.clone());
1072 }
1073 let (state, mut data) = self.query(sql.as_str());
1074 match state {
1075 true => {
1076 if data.is_empty() {
1077 return object! {};
1078 }
1079 for (field, _) in self.params.json.entries() {
1080 if !data[0][field].is_empty() {
1081 let json = data[0][field].to_string();
1082 let json = json::parse(&json).unwrap_or(array![]);
1083 data[0][field] = json;
1084 } else {
1085 data[0][field] = array![];
1086 }
1087 }
1088 data[0].clone()
1089 }
1090 false => {
1091 object! {}
1092 }
1093 }
1094 }
1095
1096 fn value(&mut self, field: &str) -> JsonValue {
1097 self.params.fields = object! {};
1098 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1099 self.params.page = 1;
1100 self.params.limit = 1;
1101 let sql = self.params.select_sql();
1102 if self.params.sql {
1103 return JsonValue::from(sql.clone());
1104 }
1105 let (state, mut data) = self.query(sql.as_str());
1106 match state {
1107 true => {
1108 for (field, _) in self.params.json.entries() {
1109 if !data[0][field].is_empty() {
1110 let json = data[0][field].to_string();
1111 let json = json::parse(&json).unwrap_or(array![]);
1112 data[0][field] = json;
1113 } else {
1114 data[0][field] = array![];
1115 }
1116 }
1117 data[0][field].clone()
1118 }
1119 false => {
1120 if self.connection.debug {
1121 info!("{data:?}");
1122 }
1123 JsonValue::Null
1124 }
1125 }
1126 }
1127
1128 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1129 let mut fields = vec![];
1130 let mut values = vec![];
1131 if !self.params.autoinc && data["id"].is_empty() {
1132 let thread_id = format!("{:?}", std::thread::current().id());
1133 let thread_num: u64 = thread_id
1134 .trim_start_matches("ThreadId(")
1135 .trim_end_matches(")")
1136 .parse()
1137 .unwrap_or(0);
1138 data["id"] = format!(
1139 "{:X}{:X}",
1140 Local::now().timestamp_nanos_opt().unwrap_or(0),
1141 thread_num
1142 )
1143 .into();
1144 }
1145 for (field, value) in data.entries() {
1146 fields.push(format!("\"{}\"", field));
1147
1148 if value.is_string() {
1149 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1150 continue;
1151 } else if value.is_array() {
1152 if self.params.json[field].is_empty() {
1153 let array = value
1154 .members()
1155 .map(|x| x.as_str().unwrap_or(""))
1156 .collect::<Vec<&str>>()
1157 .join(",");
1158 values.push(format!("'{array}'"));
1159 } else {
1160 let json = value.to_string();
1161 let json = json.replace("'", "''");
1162 values.push(format!("'{json}'"));
1163 }
1164 continue;
1165 } else if value.is_object() {
1166 if self.params.json[field].is_empty() {
1167 values.push(format!("'{value}'"));
1168 } else {
1169 let json = value.to_string();
1170 let json = json.replace("'", "''");
1171 values.push(format!("'{json}'"));
1172 }
1173 continue;
1174 } else if value.is_number() || value.is_boolean() || value.is_null() {
1175 values.push(format!("{value}"));
1176 continue;
1177 } else {
1178 values.push(format!("'{value}'"));
1179 continue;
1180 }
1181 }
1182 let fields = fields.join(",");
1183 let values = values.join(",");
1184
1185 let sql = format!(
1186 "INSERT INTO {} ({}) VALUES ({});",
1187 self.params.table, fields, values
1188 );
1189 if self.params.sql {
1190 return JsonValue::from(sql.clone());
1191 }
1192 let (state, ids) = self.execute(sql.as_str());
1193
1194 match state {
1195 true => match self.params.autoinc {
1196 true => ids.clone(),
1197 false => data["id"].clone(),
1198 },
1199 false => {
1200 let thread_id = format!("{:?}", thread::current().id());
1201 error!("添加失败: {thread_id} {ids:?} {sql}");
1202 JsonValue::from("")
1203 }
1204 }
1205 }
1206 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1207 let mut fields = String::new();
1208 if !self.params.autoinc && data[0]["id"].is_empty() {
1209 data[0]["id"] = "".into();
1210 }
1211 for (field, _) in data[0].entries() {
1212 fields = format!("{fields},\"{field}\"");
1213 }
1214 fields = fields.trim_start_matches(",").to_string();
1215
1216 let core_count = num_cpus::get();
1217 let mut p = pools::Pool::new(core_count * 4);
1218
1219 let autoinc = self.params.autoinc;
1220 for list in data.members() {
1221 let mut item = list.clone();
1222 let i = br_fields::str::Code::verification_code(3);
1223 p.execute(move |pcindex| {
1224 if !autoinc && item["id"].is_empty() {
1225 let id = format!(
1226 "{:X}{:X}{}",
1227 Local::now().timestamp_nanos_opt().unwrap_or(0),
1228 pcindex,
1229 i
1230 );
1231 item["id"] = id.into();
1232 }
1233 let mut values = "".to_string();
1234 for (_, value) in item.entries() {
1235 if value.is_string() {
1236 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1237 } else if value.is_number() {
1238 values = format!("{values},{value}");
1239 } else if value.is_boolean() {
1240 values = format!("{values},{value}");
1241 continue;
1242 } else {
1243 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1244 }
1245 }
1246 values = format!("({})", values.trim_start_matches(","));
1247 array![item["id"].clone(), values]
1248 });
1249 }
1250 let (ids_list, mut values) = p.insert_all();
1251 values = values.trim_start_matches(",").to_string();
1252 let sql = format!(
1253 "INSERT INTO {} ({}) VALUES {};",
1254 self.params.table, fields, values
1255 );
1256
1257 if self.params.sql {
1258 return JsonValue::from(sql.clone());
1259 }
1260 let (state, data) = self.execute(sql.as_str());
1261 match state {
1262 true => match autoinc {
1263 true => data,
1264 false => JsonValue::from(ids_list),
1265 },
1266 false => {
1267 error!("insert_all: {data:?}");
1268 array![]
1269 }
1270 }
1271 }
1272 fn update(&mut self, data: JsonValue) -> JsonValue {
1273 let mut values = vec![];
1274 for (field, value) in data.entries() {
1275 if value.is_string() {
1276 values.push(format!(
1277 "\"{}\"='{}'",
1278 field,
1279 value.to_string().replace("'", "''")
1280 ));
1281 } else if value.is_number() {
1282 values.push(format!("\"{field}\"= {value}"));
1283 } else if value.is_array() {
1284 if self.params.json[field].is_empty() {
1285 let array = value
1286 .members()
1287 .map(|x| x.as_str().unwrap_or(""))
1288 .collect::<Vec<&str>>()
1289 .join(",");
1290 values.push(format!("\"{field}\"='{array}'"));
1291 } else {
1292 let json = value.to_string();
1293 let json = json.replace("'", "''");
1294 values.push(format!("\"{field}\"='{json}'"));
1295 }
1296 continue;
1297 } else if value.is_object() {
1298 if self.params.json[field].is_empty() {
1299 values.push(format!("\"{field}\"='{value}'"));
1300 } else {
1301 if value.is_empty() {
1302 values.push(format!("\"{field}\"=''"));
1303 continue;
1304 }
1305 let json = value.to_string();
1306 let json = json.replace("'", "''");
1307 values.push(format!("\"{field}\"='{json}'"));
1308 }
1309 continue;
1310 } else if value.is_boolean() {
1311 values.push(format!("\"{field}\"= {value}"));
1312 } else {
1313 values.push(format!("\"{field}\"=\"{value}\""));
1314 }
1315 }
1316
1317 for (field, value) in self.params.inc_dec.entries() {
1318 values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1319 }
1320 if !self.params.update_column.is_empty() {
1321 values.extend(self.params.update_column.clone());
1322 }
1323 let values = values.join(",");
1324
1325 let sql = format!(
1326 "UPDATE {} SET {} {};",
1327 self.params.table.clone(),
1328 values,
1329 self.params.where_sql()
1330 );
1331 if self.params.sql {
1332 return JsonValue::from(sql.clone());
1333 }
1334 let (state, data) = self.execute(sql.as_str());
1335 if state {
1336 data
1337 } else {
1338 let thread_id = format!("{:?}", thread::current().id());
1339 error!("update: {thread_id} {data:?} {sql}");
1340 0.into()
1341 }
1342 }
1343 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1344 let mut values = vec![];
1345
1346 let mut ids = vec![];
1347 for (field, _) in data[0].entries() {
1348 if field == "id" {
1349 continue;
1350 }
1351 let mut fields = vec![];
1352 for row in data.members() {
1353 let value = row[field].clone();
1354 let id = row["id"].clone();
1355 ids.push(id.clone());
1356 if value.is_string() {
1357 fields.push(format!(
1358 "WHEN '{}' THEN '{}'",
1359 id,
1360 value.to_string().replace("'", "''")
1361 ));
1362 } else if value.is_array() || value.is_object() {
1363 if self.params.json[field].is_empty() {
1364 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1365 } else {
1366 let json = value.to_string();
1367 let json = json.replace("'", "''");
1368 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1369 }
1370 continue;
1371 } else if value.is_number() || value.is_boolean() || value.is_null() {
1372 fields.push(format!("WHEN '{id}' THEN {value}"));
1373 } else {
1374 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1375 }
1376 }
1377 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1378 }
1379 self.where_and("id", "in", ids.into());
1380 for (field, value) in self.params.inc_dec.entries() {
1381 values.push(format!("{} = {}", field, value.to_string().clone()));
1382 }
1383
1384 let values = values.join(",");
1385 let sql = format!(
1386 "UPDATE {} SET {} {} {};",
1387 self.params.table.clone(),
1388 values,
1389 self.params.where_sql(),
1390 self.params.page_limit_sql()
1391 );
1392 if self.params.sql {
1393 return JsonValue::from(sql.clone());
1394 }
1395 let (state, data) = self.execute(sql.as_str());
1396 if state {
1397 data
1398 } else {
1399 error!("update_all: {data:?}");
1400 JsonValue::from(0)
1401 }
1402 }
1403
1404 fn delete(&mut self) -> JsonValue {
1405 let sql = format!(
1406 "delete FROM {} {} {};",
1407 self.params.table.clone(),
1408 self.params.where_sql(),
1409 self.params.page_limit_sql()
1410 );
1411 if self.params.sql {
1412 return JsonValue::from(sql.clone());
1413 }
1414 let (state, data) = self.execute(sql.as_str());
1415 match state {
1416 true => data,
1417 false => {
1418 error!("delete 失败>>> {data:?}");
1419 JsonValue::from(0)
1420 }
1421 }
1422 }
1423
1424 fn transaction(&mut self) -> bool {
1425 let thread_id = format!("{:?}", thread::current().id());
1426 let key = format!("{}{}", self.default, thread_id);
1427
1428 if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1429 PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1430 return true;
1431 }
1432
1433 let mut conn = match self.client.get_connect_for_transaction() {
1434 Ok(c) => c,
1435 Err(e) => {
1436 error!("获取事务连接失败: {e}");
1437 return false;
1438 }
1439 };
1440
1441 if !conn.is_valid() {
1442 error!("事务连接无效");
1443 self.client.release_transaction_conn();
1444 return false;
1445 }
1446
1447 if let Err(e) = conn.execute("START TRANSACTION") {
1448 error!("启动事务失败: {e}");
1449 self.client.release_transaction_conn();
1450 return false;
1451 }
1452
1453 if let Err(e) = conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") {
1454 error!("设置事务隔离级别失败: {e}");
1455 let _ = conn.execute("ROLLBACK");
1456 self.client.release_transaction_conn();
1457 return false;
1458 }
1459
1460 PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1461 true
1462 }
1463 fn commit(&mut self) -> bool {
1464 let thread_id = format!("{:?}", thread::current().id());
1465 let key = format!("{}{}", self.default, thread_id);
1466
1467 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1468 error!("commit: 没有活跃的事务");
1469 return false;
1470 }
1471
1472 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1473 if depth > 1 {
1474 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1475 return true;
1476 }
1477
1478 let commit_result =
1479 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1480
1481 let success = match commit_result {
1482 Some(Ok(_)) => true,
1483 Some(Err(e)) => {
1484 error!("提交事务失败: {e}");
1485 false
1486 }
1487 None => {
1488 error!("提交事务失败: 未找到连接");
1489 false
1490 }
1491 };
1492
1493 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1494 self.client.release_transaction_conn();
1495 success
1496 }
1497
1498 fn rollback(&mut self) -> bool {
1499 let thread_id = format!("{:?}", thread::current().id());
1500 let key = format!("{}{}", self.default, thread_id);
1501
1502 if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1503 error!("rollback: 没有活跃的事务");
1504 return false;
1505 }
1506
1507 let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1508 if depth > 1 {
1509 PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1510 return true;
1511 }
1512
1513 let rollback_result =
1514 PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1515
1516 let success = match rollback_result {
1517 Some(Ok(_)) => true,
1518 Some(Err(e)) => {
1519 error!("回滚失败: {e}");
1520 false
1521 }
1522 None => {
1523 error!("回滚失败: 未找到连接");
1524 false
1525 }
1526 };
1527
1528 PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1529 self.client.release_transaction_conn();
1530 success
1531 }
1532
1533 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1534 let (state, data) = self.query(sql);
1535 match state {
1536 true => Ok(data),
1537 false => Err(data.to_string()),
1538 }
1539 }
1540
1541 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1542 let (state, data) = self.execute(sql);
1543 match state {
1544 true => Ok(data),
1545 false => Err(data.to_string()),
1546 }
1547 }
1548
1549 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1550 self.params.inc_dec[field] = format!("{field} + {num}").into();
1551 self
1552 }
1553
1554 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1555 self.params.inc_dec[field] = format!("{field} - {num}").into();
1556 self
1557 }
1558 fn buildsql(&mut self) -> String {
1559 self.fetch_sql();
1560 let sql = self.select().to_string();
1561 format!("( {} ) {}", sql, self.params.table)
1562 }
1563
1564 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1565 for field in fields {
1566 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1567 }
1568 self
1569 }
1570
1571 fn join(
1572 &mut self,
1573 main_table: &str,
1574 main_fields: &str,
1575 right_table: &str,
1576 right_fields: &str,
1577 ) -> &mut Self {
1578 let main_table = if main_table.is_empty() {
1579 self.params.table.clone()
1580 } else {
1581 main_table.to_string()
1582 };
1583 self.params.join_table = right_table.to_string();
1584 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1585 self
1586 }
1587
1588 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1589 let main_fields = if main_fields.is_empty() {
1590 "id"
1591 } else {
1592 main_fields
1593 };
1594 let second_fields = if second_fields.is_empty() {
1595 self.params.table.clone()
1596 } else {
1597 second_fields.to_string().clone()
1598 };
1599 let sec_table_name = format!("{}{}", table, "_2");
1600 let second_table = format!("{} {}", table, sec_table_name.clone());
1601 self.params.join_table = sec_table_name.clone();
1602 self.params.join.push(format!(
1603 " INNER JOIN {} ON {}.{} = {}.{}",
1604 second_table, self.params.table, main_fields, sec_table_name, second_fields
1605 ));
1606 self
1607 }
1608}