1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::pools;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use std::sync::Mutex;
10use std::sync::Arc;
11use std::thread;
12use br_pgsql::connect::Connect;
13use br_pgsql::pools::Pools;
14
15lazy_static! {
16 static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
17 static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
18}
19#[derive(Clone)]
20pub struct Pgsql {
21 pub connection: Connection,
23 pub default: String,
25 pub params: Params,
26 pub client: Pools,
27}
28
29impl Pgsql {
30 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
31 let port = connection.hostport.parse::<i32>().map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
32
33 let cp_connection = connection.clone();
34 let config = object! {
35 debug: cp_connection.debug,
36 username: cp_connection.username,
37 userpass: cp_connection.userpass,
38 database: cp_connection.database,
39 hostname: cp_connection.hostname,
40 hostport: port,
41 charset: cp_connection.charset.str(),
42 };
43 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
44
45 let pools = pgsql.pools()?;
46 Ok(Self {
47 connection,
48 default: default.clone(),
49 params: Params::default("pgsql"),
50 client: pools,
51 })
52 }
53
54 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
55 let thread_id = format!("{:?}", thread::current().id());
56 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
60 let key = format!("{}{}", self.default, thread_id);
62 let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
63
64 let mut t = db.lock().unwrap();
65 match t.query(sql) {
66 Ok(e) => {
67 if self.connection.debug {
68 info!("查询成功: {} {}", thread_id.clone(), sql);
69 }
70 (true, e.rows)
71 }
72 Err(e) => {
73 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
74 (false, JsonValue::from(e.to_string()))
75 }
76 }
77 } else {
78 let mut guard = match self.client.get_guard() {
80 Ok(g) => g,
81 Err(e) => {
82 error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
83 return (false, JsonValue::from(e.to_string()));
84 }
85 };
86
87 let res = guard.conn().query(sql);
88 match res {
89 Ok(e) => {
90 if self.connection.debug {
91 info!("查询成功: {} {}", thread_id.clone(), sql);
92 }
93 (true, e.rows)
94 }
95 Err(e) => {
96 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
97 (false, JsonValue::from(e.to_string()))
98 }
99 }
100 }
102 }
103 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
104 let thread_id = format!("{:?}", thread::current().id());
105
106 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
107 let key = format!("{}{}", self.default, thread_id);
108 let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
109 let mut t = db.lock().unwrap();
110 match t.execute(sql) {
111 Ok(e) => {
112 if self.connection.debug {
113 info!("提交成功: {} {}", thread_id.clone(), sql);
114 }
115 if sql.contains("INSERT") {
116 (true, e.rows)
117 } else {
118 (true, e.affect_count.into())
119 }
120 }
121 Err(e) => {
122 error!("事务提交失败: {thread_id} {e} {sql}");
123 (false, JsonValue::from(e.to_string()))
124 }
125 }
126 } else {
127 let mut guard = match self.client.get_guard() {
129 Ok(g) => g,
130 Err(e) => {
131 error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
132 return (false, JsonValue::from(e.to_string()));
133 }
134 };
135
136 let res = guard.conn().execute(sql);
137 match res {
138 Ok(e) => {
139 if self.connection.debug {
140 info!("提交成功: {} {}", thread_id.clone(), sql);
141 }
142 if sql.contains("INSERT") {
143 (true, e.rows)
144 } else {
145 (true, e.affect_count.into())
146 }
147 }
148 Err(e) => {
149 error!("非事务提交失败: {thread_id} {e} {sql}");
150 (false, JsonValue::from(e.to_string()))
151 }
152 }
153 }
155 }
156}
157
158impl DbMode for Pgsql {
159 fn database_tables(&mut self) -> JsonValue {
160 let sql = "SHOW TABLES".to_string();
161 match self.sql(sql.as_str()) {
162 Ok(e) => {
163 let mut list = vec![];
164 for item in e.members() {
165 for (_, value) in item.entries() {
166 list.push(value.clone());
167 }
168 }
169 list.into()
170 }
171 Err(_) => {
172 array![]
173 }
174 }
175 }
176
177 fn database_create(&mut self, name: &str) -> bool {
178 let sql = format!("CREATE DATABASE {name}");
179
180 let (state, data) = self.execute(sql.as_str());
181 match state {
182 true => data.as_bool().unwrap(),
183 false => {
184 error!("创建数据库失败: {data:?}");
185 false
186 }
187 }
188 }
189}
190
191impl Mode for Pgsql {
192 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
193 let mut sql = String::new();
194 let mut comments = vec![];
195
196 if !options.table_unique.is_empty() {
198 let unique = format!(
199 "CREATE UNIQUE INDEX {01}_unique_{} ON {} ({});",
200 options.table_unique.join("_"),
201 options.table_name,
202 options.table_unique.join(",")
203 );
204 comments.push(unique);
205 }
206
207 for row in options.table_index.iter() {
209 let index = format!(
210 "CREATE INDEX {01}_index_{} ON {} ({})",
211 row.join("_"),
212 options.table_name,
213 row.join(",")
214 );
215 comments.push(index);
216 }
217
218 for (name, field) in options.table_fields.entries_mut() {
219 field["table_name"] = options.table_name.clone().into();
220 let row = br_fields::field("pgsql", name, field.clone());
221 let rows = row.split("comment").collect::<Vec<&str>>();
222 comments.push(format!(
223 "COMMENT ON COLUMN {}.{} IS {};",
224 options.table_name, name, rows[1]
225 ));
226 sql = format!("{} {},\r\n", sql, rows[0]);
227 }
228
229 let primary_key = format!(
230 "CONSTRAINT {}_{} PRIMARY KEY ({})",
231 options.table_name, options.table_key, options.table_key
232 );
233 let sql = format!(
234 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
235 options.table_name,
236 sql.trim_end_matches(",\r\n"),
237 primary_key
238 );
239 comments.insert(0, sql);
240
241 for (_name, field) in options.table_fields.entries() {
242 field["mode"].as_str().unwrap();
243 {}
244 }
245
246 if self.params.sql {
247 let info = comments.join("\r\n");
248 return JsonValue::from(info);
249 }
250 for comment in comments {
251 let (state, _) = self.execute(comment.as_str());
252 match state {
253 true => {}
254 false => {
255 return JsonValue::from(state);
256 }
257 }
258 }
259 JsonValue::from(true)
260 }
261
262 fn table_update(&mut self, options: TableOptions) -> JsonValue {
263 let fields_list = self.table_info(&options.table_name);
264 let mut put = vec![];
265 let mut add = vec![];
266 let mut del = vec![];
267 let mut comments = vec![];
268
269 for (key, _) in fields_list.entries() {
270 if options.table_fields[key].is_empty() {
271 del.push(key);
272 }
273 }
274 for (name, field) in options.table_fields.entries() {
275 if !fields_list[name].is_empty() {
276 let old_comment = fields_list[name]["comment"].to_string();
277 let new_comment = br_fields::field("pgsql", name, field.clone());
278 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
279 let new_comment_text = new_comment[1].trim().trim_start_matches("'").trim_end_matches("'");
280 if old_comment == new_comment_text {
281 continue;
282 }
283 put.push(name);
284 } else {
285 add.push(name);
286 }
287 }
288
289 for name in add.iter() {
290 let name = name.to_string();
291 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
292 let rows = row.split("comment").collect::<Vec<&str>>();
293 comments.push(format!(
294 r#"ALTER TABLE "{}" add {};"#,
295 options.table_name, rows[0]
296 ));
297 comments.push(format!(
298 "COMMENT ON COLUMN {}.{} IS {};",
299 options.table_name, name, rows[1]
300 ));
301 }
302 for name in del.iter() {
303 comments.push(format!(
304 "ALTER TABLE {} DROP {};\r\n",
305 options.table_name, name
306 ));
307 }
308 for name in put.iter() {
309 let name = name.to_string();
310 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
311 let rows = row.split("comment").collect::<Vec<&str>>();
312
313 let sql = rows[0].split(" ").collect::<Vec<&str>>();
314
315 if sql[1].contains("BOOLEAN") {
316 let text = format!(
317 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
318 options.table_name, name
319 );
320 comments.push(text.clone());
321 let text = format!(
322 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
323 options.table_name, name, sql[1]
324 );
325 comments.push(text.clone());
326 } else {
327 let text = format!(
328 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
329 options.table_name, name, sql[1]
330 );
331 comments.push(text.clone());
332 };
333
334 if sql.len() > 3 && !sql[3].is_empty() {
335 let text = format!(
336 "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT '{}';\r\n",
337 options.table_name, name, sql[3]
338 );
339 comments.push(text.clone());
340 }
341
342 comments.push(format!(
343 "COMMENT ON COLUMN {}.{} IS {};",
344 options.table_name, name, rows[1]
345 ));
346 }
347
348 let mut unique_new = vec![];
349 let mut index_new = vec![];
350 let mut primary_key = vec![];
351 let (_, index_list) = self.query(
352 format!(
353 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
354 options.table_name
355 ).as_str(),
356 );
357 for item in index_list.members() {
358 let key_name = item["indexname"].as_str().unwrap();
359 let indexdef = item["indexdef"].to_string();
360
361 if indexdef.contains(
362 format!(
363 "CREATE UNIQUE INDEX {}_{} ON",
364 options.table_name, options.table_key
365 ).as_str(),
366 ) {
367 primary_key.push(key_name.to_string());
368 continue;
369 }
370 if indexdef.contains("CREATE UNIQUE INDEX") {
371 unique_new.push(key_name.to_string());
372 continue;
373 }
374 if indexdef.contains("CREATE INDEX") {
375 index_new.push(key_name.to_string());
376 continue;
377 }
378 }
379
380 if !options.table_unique.is_empty() {
381 let name = format!(
382 "{}_unique_{}",
383 options.table_name,
384 options.table_unique.join("_")
385 );
386 let unique = format!(
387 "CREATE UNIQUE INDEX {} ON {} ({});",
388 name,
389 options.table_name,
390 options.table_unique.join(",")
391 );
392 if !unique_new.contains(&name) {
393 comments.push(unique);
394 } else {
395 unique_new.retain(|x| *x != name);
396 }
397 }
398
399 for row in options.table_index.iter() {
401 let name = format!("{}_index_{}", options.table_name, row.join("_"));
402 let index = format!(
403 "CREATE INDEX {} ON {} ({})",
404 name,
405 options.table_name,
406 row.join(",")
407 );
408 if !index_new.contains(&name) {
409 comments.push(index);
410 } else {
411 index_new.retain(|x| *x != name);
412 }
413 }
414
415 for item in unique_new {
416 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
417 }
418 for item in index_new {
419 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
420 }
421
422 if self.params.sql {
423 return JsonValue::from(comments.join(""));
424 }
425
426 if comments.is_empty() {
427 return JsonValue::from(-1);
428 }
429
430 for item in comments.iter() {
431 let (state, res) = self.execute(item.as_str());
432 match state {
433 true => {}
434 false => {
435 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
436 return JsonValue::from(0);
437 }
438 }
439 }
440 JsonValue::from(1)
441 }
442
443 fn table_info(&mut self, table: &str) -> JsonValue {
444 let sql = format!(
445 "SELECT COL.COLUMN_NAME,
446 COL.DATA_TYPE,
447 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
448 LEFT JOIN
449 pg_catalog.pg_description DESCRIPTION
450 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
451 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
452 let (state, data) = self.query(sql.as_str());
453 let mut list = object! {};
454 if state {
455 for item in data.members() {
456 let mut row = object! {};
457 row["field"] = item["column_name"].clone();
458 row["comment"] = item["comment"].clone();
459 row["type"] = item["data_type"].clone();
460 list[row["field"].as_str().unwrap()] = row.clone();
461 }
462 list
463 } else {
464 list
465 }
466 }
467
468 fn table_is_exist(&mut self, name: &str) -> bool {
469 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
470 let (state, data) = self.query(sql.as_str());
471 match state {
472 true => {
473 for item in data.members() {
474 if item.has_key("exists") {
475 return item["exists"].as_bool().unwrap();
476 }
477 }
478 false
479 }
480 false => false,
481 }
482 }
483
484 fn table(&mut self, name: &str) -> &mut Pgsql {
485 self.params = Params::default(self.connection.mode.str().as_str());
486 self.params.table = format!("{}{}", self.connection.prefix, name);
487 self.params.join_table = self.params.table.clone();
488 self
489 }
490
491 fn change_table(&mut self, name: &str) -> &mut Self {
492 self.params.join_table = name.to_string();
493 self
494 }
495
496 fn autoinc(&mut self) -> &mut Self {
497 self.params.autoinc = true;
498 self
499 }
500
501 fn fetch_sql(&mut self) -> &mut Self {
502 self.params.sql = true;
503 self
504 }
505
506 fn order(&mut self, field: &str, by: bool) -> &mut Self {
507 self.params.order[field] = {
508 if by {
509 "DESC"
510 } else {
511 "ASC"
512 }
513 }.into();
514 self
515 }
516
517 fn group(&mut self, field: &str) -> &mut Self {
518 let fields: Vec<&str> = field.split(",").collect();
519 for field in fields.iter() {
520 let field = field.to_string();
521 self.params.group[field.as_str()] = field.clone().into();
522 self.params.fields[field.as_str()] = field.clone().into();
523 }
524 self
525 }
526
527 fn distinct(&mut self) -> &mut Self {
528 self.params.distinct = true;
529 self
530 }
531
532 fn json(&mut self, field: &str) -> &mut Self {
533 let list: Vec<&str> = field.split(",").collect();
534 for item in list.iter() {
535 self.params.json[item.to_string().as_str()] = item.to_string().into();
536 }
537 self
538 }
539
540 fn location(&mut self, field: &str) -> &mut Self {
541 let list: Vec<&str> = field.split(",").collect();
542 for item in list.iter() {
543 self.params.location[item.to_string().as_str()] = item.to_string().into();
544 }
545 self
546 }
547
548 fn field(&mut self, field: &str) -> &mut Self {
549 let list: Vec<&str> = field.split(",").collect();
550 let join_table = if self.params.join_table.is_empty() {
551 self.params.table.clone()
552 } else {
553 self.params.join_table.clone()
554 };
555 for item in list.iter() {
556 if item.contains(" as ") {
557 let text = item.split(" as ").collect::<Vec<&str>>();
558 if text[0].contains("count(") {
559 self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
560 } else {
561 self.params.fields[item.to_string().as_str()] = format!("{}.{} as {}", join_table, text[0], text[1]).into();
562 }
563 } else {
564 self.params.fields[item.to_string().as_str()] = format!("{join_table}.{item}").into();
565 }
566 }
567 self
568 }
569
570 fn hidden(&mut self, name: &str) -> &mut Self {
571 let hidden: Vec<&str> = name.split(",").collect();
572
573 let fields_list = self.table_info(self.params.clone().table.as_str());
574 let mut data = array![];
575 for item in fields_list.members() {
576 data.push(object! {
577 "name":item["field"].as_str().unwrap()
578 }).unwrap();
579 }
580
581 for item in data.members() {
582 let name = item["name"].as_str().unwrap();
583 if !hidden.contains(&name) {
584 self.params.fields[name] = name.into();
585 }
586 }
587 self
588 }
589
590 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
591 let join_table = if self.params.join_table.is_empty() {
592 self.params.table.clone()
593 } else {
594 self.params.join_table.clone()
595 };
596 if value.is_boolean() {
597 if value.as_bool().unwrap() {
598 value = 1.into();
599 } else {
600 value = 0.into();
601 }
602 }
603 match compare {
604 "between" => {
605 self.params.where_and.push(format!(
606 "{}.{} between '{}' AND '{}'",
607 join_table, field, value[0], value[1]
608 ));
609 }
610 "set" => {
611 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
612 let mut wheredata = vec![];
613 for item in list.iter() {
614 wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
615 }
616 self.params.where_and.push(format!("({})", wheredata.join(" or ")));
617 }
618 "notin" => {
619 let mut text = String::new();
620 for item in value.members() {
621 text = format!("{text},'{item}'");
622 }
623 text = text.trim_start_matches(",").into();
624 self.params.where_and.push(format!("{join_table}.{field} not in ({text})"));
625 }
626 "is" => {
627 self.params.where_and.push(format!("{join_table}.{field} is {value}"));
628 }
629 "notlike" => {
630 self.params.where_and.push(format!("{join_table}.{field} not like '{value}'"));
631 }
632 "in" => {
633 let mut text = String::new();
634 if value.is_array() {
635 for item in value.members() {
636 text = format!("{text},'{item}'");
637 }
638 } else if value.is_null() {
639 text = format!("{text},null");
640 } else {
641 let value = value.as_str().unwrap();
642
643 let value: Vec<&str> = value.split(",").collect();
644 for item in value.iter() {
645 text = format!("{text},'{item}'");
646 }
647 }
648 text = text.trim_start_matches(",").into();
649
650 self.params.where_and.push(format!("{join_table}.{field} {compare} ({text})"));
651 }
652 _ => {
653 self.params.where_and.push(format!("{join_table}.{field} {compare} '{value}'"));
654 }
655 }
656 self
657 }
658
659 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
660 let join_table = if self.params.join_table.is_empty() {
661 self.params.table.clone()
662 } else {
663 self.params.join_table.clone()
664 };
665
666 if value.is_boolean() {
667 if value.as_bool().unwrap() {
668 value = 1.into();
669 } else {
670 value = 0.into();
671 }
672 }
673
674 match compare {
675 "between" => {
676 self.params.where_or.push(format!(
677 "{}.{} between '{}' AND '{}'",
678 join_table, field, value[0], value[1]
679 ));
680 }
681 "set" => {
682 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
683 let mut wheredata = vec![];
684 for item in list.iter() {
685 wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
686 }
687 self.params.where_or.push(format!("({})", wheredata.join(" or ")));
688 }
689 "notin" => {
690 let mut text = String::new();
691 for item in value.members() {
692 text = format!("{text},'{item}'");
693 }
694 text = text.trim_start_matches(",").into();
695 self.params.where_or.push(format!("{join_table}.{field} not in ({text})"));
696 }
697 "in" => {
698 let mut text = String::new();
699 if value.is_array() {
700 for item in value.members() {
701 text = format!("{text},'{item}'");
702 }
703 } else {
704 let value = value.as_str().unwrap();
705 let value: Vec<&str> = value.split(",").collect();
706 for item in value.iter() {
707 text = format!("{text},'{item}'");
708 }
709 }
710 text = text.trim_start_matches(",").into();
711 self.params.where_or.push(format!("{join_table}.{field} {compare} ({text})"));
712 }
713 _ => {
714 self.params.where_or.push(format!("{join_table}.{field} {compare} '{value}'"));
715 }
716 }
717 self
718 }
719
720 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
721 self.params.where_column = format!(
722 "{}.{} {} {}.{}",
723 self.params.table, field_a, compare, self.params.table, field_b
724 );
725 self
726 }
727
728 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
729 self.params.page = page;
730 self.params.limit = limit;
731 self
732 }
733
734 fn column(&mut self, field: &str) -> JsonValue {
735 self.field(field);
736 let sql = self.params.select_sql();
737
738 if self.params.sql {
739 return JsonValue::from(sql);
740 }
741 let (state, data) = self.query(sql.as_str());
742 match state {
743 true => {
744 let mut list = array![];
745 for item in data.members() {
746 if self.params.json[field].is_empty() {
747 list.push(item[field].clone()).unwrap();
748 } else {
749 let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
750 list.push(data).unwrap();
751 }
752 }
753 list
754 }
755 false => {
756 array![]
757 }
758 }
759 }
760
761 fn count(&mut self) -> JsonValue {
762 self.params.fields["count"] = "count(*) as count".to_string().into();
763 let sql = self.params.select_sql();
764 if self.params.sql {
765 return JsonValue::from(sql.clone());
766 }
767 let (state, data) = self.query(sql.as_str());
768 if state {
769 data[0]["count"].clone()
770 } else {
771 JsonValue::from(0)
772 }
773 }
774
775 fn max(&mut self, field: &str) -> JsonValue {
776 self.params.fields[field] = format!("max({field}) as {field}").into();
777 let sql = self.params.select_sql();
778 if self.params.sql {
779 return JsonValue::from(sql.clone());
780 }
781 let (state, data) = self.query(sql.as_str());
782 if state {
783 if data.len() > 1 {
784 return data.clone();
785 }
786 data[0][field].clone()
787 } else {
788 JsonValue::from(0)
789 }
790 }
791
792 fn min(&mut self, field: &str) -> JsonValue {
793 self.params.fields[field] = format!("min({field}) as {field}").into();
794 let sql = self.params.select_sql();
795 if self.params.sql {
796 return JsonValue::from(sql.clone());
797 }
798 let (state, data) = self.query(sql.as_str());
799 if state {
800 if data.len() > 1 {
801 return data;
802 }
803 data[0][field].clone()
804 } else {
805 JsonValue::from(0)
806 }
807 }
808
809 fn sum(&mut self, field: &str) -> JsonValue {
810 self.params.fields[field] = format!("sum({field}) as {field}").into();
811 let sql = self.params.select_sql();
812 if self.params.sql {
813 return JsonValue::from(sql.clone());
814 }
815 let (state, data) = self.query(sql.as_str());
816 match state {
817 true => {
818 if data.len() > 1 {
819 return data;
820 }
821 data[0][field].clone()
822 }
823 false => JsonValue::from(0),
824 }
825 }
826
827 fn avg(&mut self, field: &str) -> JsonValue {
828 self.params.fields[field] = format!("avg({field}) as {field}").into();
829 let sql = self.params.select_sql();
830 if self.params.sql {
831 return JsonValue::from(sql.clone());
832 }
833 let (state, data) = self.query(sql.as_str());
834 if state {
835 if data.len() > 1 {
836 return data;
837 }
838 data[0][field].clone()
839 } else {
840 JsonValue::from(0)
841 }
842 }
843
844 fn select(&mut self) -> JsonValue {
845 let sql = self.params.select_sql();
846 if self.params.sql {
847 return JsonValue::from(sql.clone());
848 }
849 let (state, mut data) = self.query(sql.as_str());
850 match state {
851 true => {
852 for (field, _) in self.params.json.entries() {
853 for item in data.members_mut() {
854 if !item[field].is_empty() {
855 let json = item[field].to_string();
856 item[field] = match json::parse(&json) {
857 Ok(e) => e,
858 Err(_) => JsonValue::from(json),
859 };
860 }
861 }
862 }
863 data.clone()
864 }
865 false => array![],
866 }
867 }
868
869 fn find(&mut self) -> JsonValue {
870 self.params.page = 1;
871 self.params.limit = 1;
872 let sql = self.params.select_sql();
873 if self.params.sql {
874 return JsonValue::from(sql.clone());
875 }
876 let (state, mut data) = self.query(sql.as_str());
877 match state {
878 true => {
879 if data.is_empty() {
880 return object! {};
881 }
882 for (field, _) in self.params.json.entries() {
883 if !data[0][field].is_empty() {
884 let json = data[0][field].to_string();
885 let json = json::parse(&json).unwrap_or(array![]);
886 data[0][field] = json;
887 } else {
888 data[0][field] = array![];
889 }
890 }
891 data[0].clone()
892 }
893 false => {
894 object! {}
895 }
896 }
897 }
898
899 fn value(&mut self, field: &str) -> JsonValue {
900 self.params.fields = object! {};
901 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
902 self.params.page = 1;
903 self.params.limit = 1;
904 let sql = self.params.select_sql();
905 if self.params.sql {
906 return JsonValue::from(sql.clone());
907 }
908 let (state, mut data) = self.query(sql.as_str());
909 match state {
910 true => {
911 for (field, _) in self.params.json.entries() {
912 if !data[0][field].is_empty() {
913 let json = data[0][field].to_string();
914 let json = json::parse(&json).unwrap_or(array![]);
915 data[0][field] = json;
916 } else {
917 data[0][field] = array![];
918 }
919 }
920 data[0][field].clone()
921 }
922 false => {
923 if self.connection.debug {
924 info!("{data:?}");
925 }
926 JsonValue::Null
927 }
928 }
929 }
930
931 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
932 let mut fields = vec![];
933 let mut values = vec![];
934 if !self.params.autoinc && data["id"].is_empty() {
935 data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
936 }
937 for (field, value) in data.entries() {
938 fields.push(field);
939
940 if value.is_string() {
941 values.push(format!("'{}'", value.to_string().replace("'", "''")));
942 continue;
943 } else if value.is_array() {
944 if self.params.json[field].is_empty() {
945 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
946 values.push(format!("'{array}'"));
947 } else {
948 let json = value.to_string();
949 let json = json.replace("'", "''");
950 values.push(format!("'{json}'"));
951 }
952 continue;
953 } else if value.is_object() {
954 if self.params.json[field].is_empty() {
955 values.push(format!("'{value}'"));
956 } else {
957 let json = value.to_string();
958 let json = json.replace("'", "''");
959 values.push(format!("'{json}'"));
960 }
961 continue;
962 } else if value.is_number() || value.is_boolean() || value.is_null() {
963 values.push(format!("{value}"));
964 continue;
965 } else {
966 values.push(format!("'{value}'"));
967 continue;
968 }
969 }
970 let fields = fields.join(",");
971 let values = values.join(",");
972
973 let sql = format!(
974 "INSERT INTO {} ({}) VALUES ({});",
975 self.params.table, fields, values
976 );
977 if self.params.sql {
978 return JsonValue::from(sql.clone());
979 }
980 let (state, ids) = self.execute(sql.as_str());
981
982 match state {
983 true => match self.params.autoinc {
984 true => ids.clone(),
985 false => data["id"].clone(),
986 },
987 false => {
988 let thread_id = format!("{:?}", thread::current().id());
989 error!("添加失败: {thread_id} {ids:?} {sql}");
990 JsonValue::from("")
991 }
992 }
993 }
994 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
995 let mut fields = String::new();
996 if !self.params.autoinc && data[0]["id"].is_empty() {
997 data[0]["id"] = "".into();
998 }
999 for (field, _) in data[0].entries() {
1000 fields = format!("{fields},{field}");
1001 }
1002 fields = fields.trim_start_matches(",").parse().unwrap();
1003
1004 let core_count = num_cpus::get();
1005 let mut p = pools::Pool::new(core_count * 4);
1006
1007 let autoinc = self.params.autoinc;
1008 for list in data.members() {
1009 let mut item = list.clone();
1010 let i = br_fields::str::Code::verification_code(3);
1011 p.execute(move |pcindex| {
1012 if !autoinc && item["id"].is_empty() {
1013 let id = format!(
1014 "{:X}{:X}{}",
1015 Local::now().timestamp_nanos_opt().unwrap(),
1016 pcindex,
1017 i
1018 );
1019 item["id"] = id.into();
1020 }
1021 let mut values = "".to_string();
1022 for (_, value) in item.entries() {
1023 if value.is_string() {
1024 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1025 } else if value.is_number() {
1026 values = format!("{values},{value}");
1027 } else if value.is_boolean() {
1028 values = format!("{values},{value}");
1029 continue;
1030 } else {
1031 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1032 }
1033 }
1034 values = format!("({})", values.trim_start_matches(","));
1035 array![item["id"].clone(), values]
1036 });
1037 }
1038 let (ids_list, mut values) = p.insert_all();
1039 values = values.trim_start_matches(",").parse().unwrap();
1040 let sql = format!(
1041 "INSERT INTO {} ({}) VALUES {};",
1042 self.params.table, fields, values
1043 );
1044
1045 if self.params.sql {
1046 return JsonValue::from(sql.clone());
1047 }
1048 let (state, data) = self.execute(sql.as_str());
1049 match state {
1050 true => match autoinc {
1051 true => data,
1052 false => JsonValue::from(ids_list),
1053 },
1054 false => {
1055 error!("insert_all: {data:?}");
1056 array![]
1057 }
1058 }
1059 }
1060 fn update(&mut self, data: JsonValue) -> JsonValue {
1061 let mut values = vec![];
1062 for (field, value) in data.entries() {
1063 if value.is_string() {
1064 values.push(format!(
1065 "{}='{}'",
1066 field,
1067 value.to_string().replace("'", "''")
1068 ));
1069 } else if value.is_number() {
1070 values.push(format!("{field}= {value}"));
1071 } else if value.is_array() {
1072 if self.params.json[field].is_empty() {
1073 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1074 values.push(format!("{field}='{array}'"));
1075 } else {
1076 let json = value.to_string();
1077 let json = json.replace("'", "''");
1078 values.push(format!("{field}='{json}'"));
1079 }
1080 continue;
1081 } else if value.is_object() {
1082 if self.params.json[field].is_empty() {
1083 values.push(format!("{field}='{value}'"));
1084 } else {
1085 if value.is_empty() {
1086 values.push(format!("{field}=''"));
1087 continue;
1088 }
1089 let json = value.to_string();
1090 let json = json.replace("'", "''");
1091 values.push(format!("{field}='{json}'"));
1092 }
1093 continue;
1094 } else if value.is_boolean() {
1095 values.push(format!("{field}= {value}"));
1096 } else {
1097 values.push(format!("{field}=\"{value}\""));
1098 }
1099 }
1100
1101 for (field, value) in self.params.inc_dec.entries() {
1102 values.push(format!("{} = {}", field, value.to_string().clone()));
1103 }
1104
1105 let values = values.join(",");
1106
1107 let sql = format!(
1108 "UPDATE {} SET {} {};",
1109 self.params.table.clone(),
1110 values,
1111 self.params.where_sql()
1112 );
1113 if self.params.sql {
1114 return JsonValue::from(sql.clone());
1115 }
1116 let (state, data) = self.execute(sql.as_str());
1117 if state {
1118 data
1119 } else {
1120 let thread_id = format!("{:?}", thread::current().id());
1121 error!("update: {thread_id} {data:?} {sql}");
1122 0.into()
1123 }
1124 }
1125 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1126 let mut values = vec![];
1127
1128 let mut ids = vec![];
1129 for (field, _) in data[0].entries() {
1130 if field == "id" {
1131 continue;
1132 }
1133 let mut fields = vec![];
1134 for row in data.members() {
1135 let value = row[field].clone();
1136 let id = row["id"].clone();
1137 ids.push(id.clone());
1138 if value.is_string() {
1139 fields.push(format!(
1140 "WHEN '{}' THEN '{}'",
1141 id,
1142 value.to_string().replace("'", "''")
1143 ));
1144 } else if value.is_array() || value.is_object() {
1145 if self.params.json[field].is_empty() {
1146 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1147 } else {
1148 let json = value.to_string();
1149 let json = json.replace("'", "''");
1150 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1151 }
1152 continue;
1153 } else if value.is_number() || value.is_boolean() || value.is_null() {
1154 fields.push(format!("WHEN '{id}' THEN {value}"));
1155 } else {
1156 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1157 }
1158 }
1159 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1160 }
1161 self.where_and("id", "in", ids.into());
1162 for (field, value) in self.params.inc_dec.entries() {
1163 values.push(format!("{} = {}", field, value.to_string().clone()));
1164 }
1165
1166 let values = values.join(",");
1167 let sql = format!(
1168 "UPDATE {} SET {} {} {};",
1169 self.params.table.clone(),
1170 values,
1171 self.params.where_sql(),
1172 self.params.page_limit_sql()
1173 );
1174 if self.params.sql {
1175 return JsonValue::from(sql.clone());
1176 }
1177 let (state, data) = self.execute(sql.as_str());
1178 if state {
1179 data
1180 } else {
1181 error!("update_all: {data:?}");
1182 JsonValue::from(0)
1183 }
1184 }
1185
1186 fn delete(&mut self) -> JsonValue {
1187 let sql = format!(
1188 "delete FROM {} {} {};",
1189 self.params.table.clone(),
1190 self.params.where_sql(),
1191 self.params.page_limit_sql()
1192 );
1193 if self.params.sql {
1194 return JsonValue::from(sql.clone());
1195 }
1196 let (state, data) = self.execute(sql.as_str());
1197 match state {
1198 true => data,
1199 false => {
1200 error!("delete 失败>>> {data:?}");
1201 JsonValue::from(0)
1202 }
1203 }
1204 }
1205
1206 fn transaction(&mut self) -> bool {
1207 let thread_id = format!("{:?}", thread::current().id());
1208
1209 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1210 let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1211 t += 1;
1212 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1213 return true;
1214 }
1215
1216 TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1217 let key = format!("{}{}", self.default, thread_id);
1218
1219 let mut guard = match self.client.get_guard() {
1221 Ok(g) => g,
1222 Err(e) => {
1223 error!("获取事务连接失败: {e}");
1224 return false;
1225 }
1226 };
1227
1228 let conn = guard.conn().clone();
1229
1230 drop(guard);
1232
1233 TR.lock().unwrap().insert(key.clone(), Arc::new(Mutex::new(conn)));
1234
1235 let sql = "START TRANSACTION;".to_string();
1236 let (state, _) = self.execute(sql.as_str());
1237 match state {
1238 true => {
1239 let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1240 let (state, _) = self.execute(sql.as_str());
1241 match state {
1242 true => state,
1243 false => {
1244 TRANS.lock().unwrap().remove(&*thread_id.clone());
1245 TR.lock().unwrap().remove(&key.clone());
1246 state
1247 }
1248 }
1249 }
1250 false => {
1251 TRANS.lock().unwrap().remove(&*thread_id.clone());
1252 TR.lock().unwrap().remove(&key.clone());
1253 state
1254 }
1255 }
1256 }
1257 fn commit(&mut self) -> bool {
1258 let thread_id = format!("{:?}", thread::current().id());
1259 let sql = "COMMIT".to_string();
1260
1261 let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1262 if t > 1 {
1263 t -= 1;
1264 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1265 return true;
1266 }
1267 let (state, data) = self.execute(sql.as_str());
1268 TRANS.lock().unwrap().remove(&thread_id);
1269 let key = format!("{}{}", self.default, thread_id);
1270 TR.lock().unwrap().remove(&*key);
1271 match state {
1272 true => {}
1273 false => {
1274 error!("提交事务失败: {data}");
1275 }
1276 }
1277 state
1278 }
1279
1280 fn rollback(&mut self) -> bool {
1281 let thread_id = format!("{:?}", thread::current().id());
1282 let sql = "ROLLBACK".to_string();
1283
1284 let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1285 if t > 1 {
1286 t -= 1;
1287 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1288 return true;
1289 }
1290 let (state, data) = self.execute(sql.as_str());
1291 TRANS.lock().unwrap().remove(&thread_id);
1292 let key = format!("{}{}", self.default, thread_id);
1293 TR.lock().unwrap().remove(&*key);
1294 match state {
1295 true => {}
1296 false => {
1297 error!("回滚失败: {data}");
1298 }
1299 }
1300 state
1301 }
1302
1303 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1304 let (state, data) = self.query(sql);
1305 match state {
1306 true => Ok(data),
1307 false => Err(data.to_string()),
1308 }
1309 }
1310
1311 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1312 let (state, data) = self.execute(sql);
1313 match state {
1314 true => Ok(data),
1315 false => Err(data.to_string()),
1316 }
1317 }
1318
1319 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1320 self.params.inc_dec[field] = format!("{field} + {num}").into();
1321 self
1322 }
1323
1324 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1325 self.params.inc_dec[field] = format!("{field} - {num}").into();
1326 self
1327 }
1328 fn buildsql(&mut self) -> String {
1329 self.fetch_sql();
1330 let sql = self.select().to_string();
1331 format!("( {} ) {}", sql, self.params.table)
1332 }
1333
1334 fn join(&mut self, table: &str, main_fields: &str, right_fields: &str) -> &mut Self {
1335 let main_table = if self.params.join_table.is_empty() {
1336 self.params.table.clone()
1337 } else {
1338 self.params.join_table.clone()
1339 };
1340 self.params.join_table = table.to_string();
1341 self.params.join.push(format!(" LEFT JOIN {table} ON {main_table}.{main_fields} = {table}.{right_fields}"));
1342 self
1343 }
1344
1345 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1346 let main_fields = if main_fields.is_empty() {
1347 "id"
1348 } else {
1349 main_fields
1350 };
1351 let second_fields = if second_fields.is_empty() {
1352 self.params.table.clone()
1353 } else {
1354 second_fields.to_string().clone()
1355 };
1356 let sec_table_name = format!("{}{}", table, "_2");
1357 let second_table = format!("{} {}", table, sec_table_name.clone());
1358 self.params.join_table = sec_table_name.clone();
1359 self.params.join.push(format!(
1360 " INNER JOIN {} ON {}.{} = {}.{}",
1361 second_table, self.params.table, main_fields, sec_table_name, second_fields
1362 ));
1363 self
1364 }
1365}