1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::pools;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use std::sync::Mutex;
10use std::sync::Arc;
11use std::thread;
12use br_pgsql::connect::Connect;
13use br_pgsql::pools::Pools;
14
15lazy_static! {
16 static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
17 static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
18}
19#[derive(Clone)]
20pub struct Pgsql {
21 pub connection: Connection,
23 pub default: String,
25 pub params: Params,
26 pub client: Pools,
27}
28
29impl Pgsql {
30 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
31 let port = connection.hostport.parse::<i32>().map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
32
33 let cp_connection = connection.clone();
34 let config = object! {
35 debug: cp_connection.debug,
36 username: cp_connection.username,
37 userpass: cp_connection.userpass,
38 database: cp_connection.database,
39 hostname: cp_connection.hostname,
40 hostport: port,
41 charset: cp_connection.charset.str(),
42 };
43 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
44
45 let pools = pgsql.pools()?;
46 Ok(Self {
47 connection,
48 default: default.clone(),
49 params: Params::default("pgsql"),
50 client: pools,
51 })
52 }
53
54 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
55 let thread_id = format!("{:?}", thread::current().id());
56 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
60 let key = format!("{}{}", self.default, thread_id);
62 let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
63
64 let mut t = db.lock().unwrap();
65 match t.query(sql) {
66 Ok(e) => {
67 if self.connection.debug {
68 info!("查询成功: {} {}", thread_id.clone(), sql);
69 }
70 (true, e.rows)
71 }
72 Err(e) => {
73 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
74 (false, JsonValue::from(e.to_string()))
75 }
76 }
77 } else {
78 let mut guard = match self.client.get_guard() {
80 Ok(g) => g,
81 Err(e) => {
82 error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
83 return (false, JsonValue::from(e.to_string()));
84 }
85 };
86
87 let res = guard.conn().query(sql);
88 match res {
89 Ok(e) => {
90 if self.connection.debug {
91 info!("查询成功: {} {}", thread_id.clone(), sql);
92 }
93 (true, e.rows)
94 }
95 Err(e) => {
96 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
97 (false, JsonValue::from(e.to_string()))
98 }
99 }
100 }
102 }
103 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
104 let thread_id = format!("{:?}", thread::current().id());
105
106 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
107 let key = format!("{}{}", self.default, thread_id);
108 let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
109 let mut t = db.lock().unwrap();
110 match t.execute(sql) {
111 Ok(e) => {
112 if self.connection.debug {
113 info!("提交成功: {} {}", thread_id.clone(), sql);
114 }
115 if sql.contains("INSERT") {
116 (true, e.rows)
117 } else {
118 (true, e.affect_count.into())
119 }
120 }
121 Err(e) => {
122 error!("事务提交失败: {thread_id} {e} {sql}");
123 (false, JsonValue::from(e.to_string()))
124 }
125 }
126 } else {
127 let mut guard = match self.client.get_guard() {
129 Ok(g) => g,
130 Err(e) => {
131 error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
132 return (false, JsonValue::from(e.to_string()));
133 }
134 };
135
136 let res = guard.conn().execute(sql);
137 match res {
138 Ok(e) => {
139 if self.connection.debug {
140 info!("提交成功: {} {}", thread_id.clone(), sql);
141 }
142 if sql.contains("INSERT") {
143 (true, e.rows)
144 } else {
145 (true, e.affect_count.into())
146 }
147 }
148 Err(e) => {
149 error!("非事务提交失败: {thread_id} {e} {sql}");
150 (false, JsonValue::from(e.to_string()))
151 }
152 }
153 }
155 }
156}
157
158impl DbMode for Pgsql {
159 fn database_tables(&mut self) -> JsonValue {
160 let sql = "SHOW TABLES".to_string();
161 match self.sql(sql.as_str()) {
162 Ok(e) => {
163 let mut list = vec![];
164 for item in e.members() {
165 for (_, value) in item.entries() {
166 list.push(value.clone());
167 }
168 }
169 list.into()
170 }
171 Err(_) => {
172 array![]
173 }
174 }
175 }
176
177 fn database_create(&mut self, name: &str) -> bool {
178 let sql = format!("CREATE DATABASE {name}");
179
180 let (state, data) = self.execute(sql.as_str());
181 match state {
182 true => data.as_bool().unwrap(),
183 false => {
184 error!("创建数据库失败: {data:?}");
185 false
186 }
187 }
188 }
189}
190
191impl Mode for Pgsql {
192 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
193 let mut sql = String::new();
194 let mut comments = vec![];
195
196 if !options.table_unique.is_empty() {
198 let unique = format!(
199 "CREATE UNIQUE INDEX {01}_unique_{} ON {} ({});",
200 options.table_unique.join("_"),
201 options.table_name,
202 options.table_unique.join(",")
203 );
204 comments.push(unique);
205 }
206
207 for row in options.table_index.iter() {
209 let index = format!(
210 "CREATE INDEX {01}_index_{} ON {} ({})",
211 row.join("_"),
212 options.table_name,
213 row.join(",")
214 );
215 comments.push(index);
216 }
217
218 for (name, field) in options.table_fields.entries_mut() {
219 field["table_name"] = options.table_name.clone().into();
220 let row = br_fields::field("pgsql", name, field.clone());
221 let rows = row.split("comment").collect::<Vec<&str>>();
222 comments.push(format!(
223 "COMMENT ON COLUMN {}.{} IS {};",
224 options.table_name, name, rows[1]
225 ));
226 sql = format!("{} {},\r\n", sql, rows[0]);
227 }
228
229 let primary_key = format!(
230 "CONSTRAINT {}_{} PRIMARY KEY ({})",
231 options.table_name, options.table_key, options.table_key
232 );
233 let sql = format!(
234 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
235 options.table_name,
236 sql.trim_end_matches(",\r\n"),
237 primary_key
238 );
239 comments.insert(0, sql);
240
241 for (_name, field) in options.table_fields.entries() {
242 field["mode"].as_str().unwrap();
243 {}
244 }
245
246 if self.params.sql {
247 let info = comments.join("\r\n");
248 return JsonValue::from(info);
249 }
250 for comment in comments {
251 let (state, _) = self.execute(comment.as_str());
252 match state {
253 true => {}
254 false => {
255 return JsonValue::from(state);
256 }
257 }
258 }
259 JsonValue::from(true)
260 }
261
262 fn table_update(&mut self, options: TableOptions) -> JsonValue {
263 let fields_list = self.table_info(&options.table_name);
264 let mut put = vec![];
265 let mut add = vec![];
266 let mut del = vec![];
267 let mut comments = vec![];
268
269 for (key, _) in fields_list.entries() {
270 if options.table_fields[key].is_empty() {
271 del.push(key);
272 }
273 }
274 for (name, field) in options.table_fields.entries() {
275 if !fields_list[name].is_empty() {
276 let old_comment = fields_list[name]["comment"].to_string();
277 let new_comment = br_fields::field("pgsql", name, field.clone());
278 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
279 let new_comment_text = new_comment[1].trim().trim_start_matches("'").trim_end_matches("'");
280 if old_comment == new_comment_text {
281 continue;
282 }
283 put.push(name);
284 } else {
285 add.push(name);
286 }
287 }
288
289 for name in add.iter() {
290 let name = name.to_string();
291 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
292 let rows = row.split("comment").collect::<Vec<&str>>();
293 comments.push(format!(
294 r#"ALTER TABLE "{}" add {};"#,
295 options.table_name, rows[0]
296 ));
297 comments.push(format!(
298 "COMMENT ON COLUMN {}.{} IS {};",
299 options.table_name, name, rows[1]
300 ));
301 }
302 for name in del.iter() {
303 comments.push(format!(
304 "ALTER TABLE {} DROP {};\r\n",
305 options.table_name, name
306 ));
307 }
308 for name in put.iter() {
309 let name = name.to_string();
310 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
311 let rows = row.split("comment").collect::<Vec<&str>>();
312
313 let sql = rows[0].split(" ").collect::<Vec<&str>>();
314
315 if sql[1].contains("BOOLEAN") {
316 let text = format!(
317 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
318 options.table_name, name
319 );
320 comments.push(text.clone());
321 let text = format!(
322 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
323 options.table_name, name, sql[1]
324 );
325 comments.push(text.clone());
326 } else {
327 let text = format!(
328 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
329 options.table_name, name, sql[1]
330 );
331 comments.push(text.clone());
332 };
333
334 if sql.len() > 3 && !sql[3].is_empty() {
335 let text = format!(
336 "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT '{}';\r\n",
337 options.table_name, name, sql[3]
338 );
339 comments.push(text.clone());
340 }
341
342 comments.push(format!(
343 "COMMENT ON COLUMN {}.{} IS {};",
344 options.table_name, name, rows[1]
345 ));
346 }
347
348 let mut unique_new = vec![];
349 let mut index_new = vec![];
350 let mut primary_key = vec![];
351 let (_, index_list) = self.query(
352 format!(
353 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
354 options.table_name
355 ).as_str(),
356 );
357 for item in index_list.members() {
358 let key_name = item["indexname"].as_str().unwrap();
359 let indexdef = item["indexdef"].to_string();
360
361 if indexdef.contains(
362 format!(
363 "CREATE UNIQUE INDEX {}_{} ON",
364 options.table_name, options.table_key
365 ).as_str(),
366 ) {
367 primary_key.push(key_name.to_string());
368 continue;
369 }
370 if indexdef.contains("CREATE UNIQUE INDEX") {
371 unique_new.push(key_name.to_string());
372 continue;
373 }
374 if indexdef.contains("CREATE INDEX") {
375 index_new.push(key_name.to_string());
376 continue;
377 }
378 }
379
380 if !options.table_unique.is_empty() {
381 let name = format!(
382 "{}_unique_{}",
383 options.table_name,
384 options.table_unique.join("_")
385 );
386 let unique = format!(
387 "CREATE UNIQUE INDEX {} ON {} ({});",
388 name,
389 options.table_name,
390 options.table_unique.join(",")
391 );
392 if !unique_new.contains(&name) {
393 comments.push(unique);
394 } else {
395 unique_new.retain(|x| *x != name);
396 }
397 }
398
399 for row in options.table_index.iter() {
401 let name = format!("{}_index_{}", options.table_name, row.join("_"));
402 let index = format!(
403 "CREATE INDEX {} ON {} ({})",
404 name,
405 options.table_name,
406 row.join(",")
407 );
408 if !index_new.contains(&name) {
409 comments.push(index);
410 } else {
411 index_new.retain(|x| *x != name);
412 }
413 }
414
415 for item in unique_new {
416 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
417 }
418 for item in index_new {
419 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
420 }
421
422 if self.params.sql {
423 return JsonValue::from(comments.join(""));
424 }
425
426 if comments.is_empty() {
427 return JsonValue::from(-1);
428 }
429
430 for item in comments.iter() {
431 let (state, res) = self.execute(item.as_str());
432 match state {
433 true => {}
434 false => {
435 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
436 return JsonValue::from(0);
437 }
438 }
439 }
440 JsonValue::from(1)
441 }
442
443 fn table_info(&mut self, table: &str) -> JsonValue {
444 let sql = format!(
445 "SELECT COL.COLUMN_NAME,
446 COL.DATA_TYPE,
447 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
448 LEFT JOIN
449 pg_catalog.pg_description DESCRIPTION
450 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
451 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
452 let (state, data) = self.query(sql.as_str());
453 let mut list = object! {};
454 if state {
455 for item in data.members() {
456 let mut row = object! {};
457 row["field"] = item["column_name"].clone();
458 row["comment"] = item["comment"].clone();
459 row["type"] = item["data_type"].clone();
460 list[row["field"].as_str().unwrap()] = row.clone();
461 }
462 list
463 } else {
464 list
465 }
466 }
467
468 fn table_is_exist(&mut self, name: &str) -> bool {
469 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
470 let (state, data) = self.query(sql.as_str());
471 match state {
472 true => {
473 for item in data.members() {
474 if item.has_key("exists") {
475 return item["exists"].as_bool().unwrap();
476 }
477 }
478 false
479 }
480 false => false,
481 }
482 }
483
484 fn table(&mut self, name: &str) -> &mut Pgsql {
485 self.params = Params::default(self.connection.mode.str().as_str());
486 self.params.table = format!("{}{}", self.connection.prefix, name);
487 self.params.join_table = self.params.table.clone();
488 self
489 }
490
491 fn change_table(&mut self, name: &str) -> &mut Self {
492 self.params.join_table = name.to_string();
493 self
494 }
495
496 fn autoinc(&mut self) -> &mut Self {
497 self.params.autoinc = true;
498 self
499 }
500
501 fn fetch_sql(&mut self) -> &mut Self {
502 self.params.sql = true;
503 self
504 }
505
506 fn order(&mut self, field: &str, by: bool) -> &mut Self {
507 self.params.order[field] = {
508 if by {
509 "DESC"
510 } else {
511 "ASC"
512 }
513 }.into();
514 self
515 }
516
517 fn group(&mut self, field: &str) -> &mut Self {
518 let fields: Vec<&str> = field.split(",").collect();
519 for field in fields.iter() {
520 let field = field.to_string();
521 self.params.group[field.as_str()] = field.clone().into();
522 self.params.fields[field.as_str()] = field.clone().into();
523 }
524 self
525 }
526
527 fn distinct(&mut self) -> &mut Self {
528 self.params.distinct = true;
529 self
530 }
531
532 fn json(&mut self, field: &str) -> &mut Self {
533 let list: Vec<&str> = field.split(",").collect();
534 for item in list.iter() {
535 self.params.json[item.to_string().as_str()] = item.to_string().into();
536 }
537 self
538 }
539
540 fn location(&mut self, field: &str) -> &mut Self {
541 let list: Vec<&str> = field.split(",").collect();
542 for item in list.iter() {
543 self.params.location[item.to_string().as_str()] = item.to_string().into();
544 }
545 self
546 }
547
548 fn field(&mut self, field: &str) -> &mut Self {
549 let list: Vec<&str> = field.split(",").collect();
550 let join_table = if self.params.join_table.is_empty() {
551 self.params.table.clone()
552 } else {
553 self.params.join_table.clone()
554 };
555 for item in list.iter() {
556 if item.contains(" as ") {
557 let text = item.split(" as ").collect::<Vec<&str>>();
558 if text[0].contains("count(") {
559 self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
560 } else {
561 self.params.fields[item.to_string().as_str()] = format!("{}.{} as {}", join_table, text[0], text[1]).into();
562 }
563 } else {
564 self.params.fields[item.to_string().as_str()] = format!("{join_table}.{item}").into();
565 }
566 }
567 self
568 }
569
570 fn hidden(&mut self, name: &str) -> &mut Self {
571 let hidden: Vec<&str> = name.split(",").collect();
572
573 let fields_list = self.table_info(self.params.clone().table.as_str());
574 let mut data = array![];
575 for item in fields_list.members() {
576 data.push(object! {
577 "name":item["field"].as_str().unwrap()
578 }).unwrap();
579 }
580
581 for item in data.members() {
582 let name = item["name"].as_str().unwrap();
583 if !hidden.contains(&name) {
584 self.params.fields[name] = name.into();
585 }
586 }
587 self
588 }
589
590 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
591 let join_table = if self.params.join_table.is_empty() {
592 self.params.table.clone()
593 } else {
594 self.params.join_table.clone()
595 };
596 if value.is_boolean() {
597 if value.as_bool().unwrap() {
598 value = 1.into();
599 } else {
600 value = 0.into();
601 }
602 }
603 match compare {
604 "between" => {
605 self.params.where_and.push(format!(
606 "{}.{} between '{}' AND '{}'",
607 join_table, field, value[0], value[1]
608 ));
609 }
610 "set" => {
611 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
612 let mut wheredata = vec![];
613 for item in list.iter() {
614 wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
615 }
616 self.params.where_and.push(format!("({})", wheredata.join(" or ")));
617 }
618 "notin" => {
619 let mut text = String::new();
620 for item in value.members() {
621 text = format!("{text},'{item}'");
622 }
623 text = text.trim_start_matches(",").into();
624 self.params.where_and.push(format!("{join_table}.{field} not in ({text})"));
625 }
626 "is" => {
627 self.params.where_and.push(format!("{join_table}.{field} is {value}"));
628 }
629 "isnot" => {
630 self.params.where_and.push(format!("{join_table}.{field} is not {value}"));
631 }
632 "notlike" => {
633 self.params.where_and.push(format!("{join_table}.{field} not like '{value}'"));
634 }
635 "in" => {
636 let mut text = String::new();
637 if value.is_array() {
638 for item in value.members() {
639 text = format!("{text},'{item}'");
640 }
641 } else if value.is_null() {
642 text = format!("{text},null");
643 } else {
644 let value = value.as_str().unwrap();
645
646 let value: Vec<&str> = value.split(",").collect();
647 for item in value.iter() {
648 text = format!("{text},'{item}'");
649 }
650 }
651 text = text.trim_start_matches(",").into();
652
653 self.params.where_and.push(format!("{join_table}.{field} {compare} ({text})"));
654 }
655 _ => {
656 self.params.where_and.push(format!("{join_table}.{field} {compare} '{value}'"));
657 }
658 }
659 self
660 }
661
662 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
663 let join_table = if self.params.join_table.is_empty() {
664 self.params.table.clone()
665 } else {
666 self.params.join_table.clone()
667 };
668
669 if value.is_boolean() {
670 if value.as_bool().unwrap() {
671 value = 1.into();
672 } else {
673 value = 0.into();
674 }
675 }
676
677 match compare {
678 "between" => {
679 self.params.where_or.push(format!(
680 "{}.{} between '{}' AND '{}'",
681 join_table, field, value[0], value[1]
682 ));
683 }
684 "set" => {
685 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
686 let mut wheredata = vec![];
687 for item in list.iter() {
688 wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
689 }
690 self.params.where_or.push(format!("({})", wheredata.join(" or ")));
691 }
692 "notin" => {
693 let mut text = String::new();
694 for item in value.members() {
695 text = format!("{text},'{item}'");
696 }
697 text = text.trim_start_matches(",").into();
698 self.params.where_or.push(format!("{join_table}.{field} not in ({text})"));
699 }
700 "is" => {
701 self.params.where_or.push(format!("{join_table}.{field} is {value}"));
702 }
703 "isnot" => {
704 self.params.where_or.push(format!("{join_table}.{field} is not {value}"));
705 }
706 "in" => {
707 let mut text = String::new();
708 if value.is_array() {
709 for item in value.members() {
710 text = format!("{text},'{item}'");
711 }
712 } else {
713 let value = value.as_str().unwrap();
714 let value: Vec<&str> = value.split(",").collect();
715 for item in value.iter() {
716 text = format!("{text},'{item}'");
717 }
718 }
719 text = text.trim_start_matches(",").into();
720 self.params.where_or.push(format!("{join_table}.{field} {compare} ({text})"));
721 }
722 _ => {
723 self.params.where_or.push(format!("{join_table}.{field} {compare} '{value}'"));
724 }
725 }
726 self
727 }
728
729 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
730 self.params.where_column = format!(
731 "{}.{} {} {}.{}",
732 self.params.table, field_a, compare, self.params.table, field_b
733 );
734 self
735 }
736
737 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
738 self.params.page = page;
739 self.params.limit = limit;
740 self
741 }
742
743 fn column(&mut self, field: &str) -> JsonValue {
744 self.field(field);
745 let sql = self.params.select_sql();
746
747 if self.params.sql {
748 return JsonValue::from(sql);
749 }
750 let (state, data) = self.query(sql.as_str());
751 match state {
752 true => {
753 let mut list = array![];
754 for item in data.members() {
755 if self.params.json[field].is_empty() {
756 list.push(item[field].clone()).unwrap();
757 } else {
758 let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
759 list.push(data).unwrap();
760 }
761 }
762 list
763 }
764 false => {
765 array![]
766 }
767 }
768 }
769
770 fn count(&mut self) -> JsonValue {
771 self.params.fields["count"] = "count(*) as count".to_string().into();
772 let sql = self.params.select_sql();
773 if self.params.sql {
774 return JsonValue::from(sql.clone());
775 }
776 let (state, data) = self.query(sql.as_str());
777 if state {
778 data[0]["count"].clone()
779 } else {
780 JsonValue::from(0)
781 }
782 }
783
784 fn max(&mut self, field: &str) -> JsonValue {
785 self.params.fields[field] = format!("max({field}) as {field}").into();
786 let sql = self.params.select_sql();
787 if self.params.sql {
788 return JsonValue::from(sql.clone());
789 }
790 let (state, data) = self.query(sql.as_str());
791 if state {
792 if data.len() > 1 {
793 return data.clone();
794 }
795 data[0][field].clone()
796 } else {
797 JsonValue::from(0)
798 }
799 }
800
801 fn min(&mut self, field: &str) -> JsonValue {
802 self.params.fields[field] = format!("min({field}) as {field}").into();
803 let sql = self.params.select_sql();
804 if self.params.sql {
805 return JsonValue::from(sql.clone());
806 }
807 let (state, data) = self.query(sql.as_str());
808 if state {
809 if data.len() > 1 {
810 return data;
811 }
812 data[0][field].clone()
813 } else {
814 JsonValue::from(0)
815 }
816 }
817
818 fn sum(&mut self, field: &str) -> JsonValue {
819 self.params.fields[field] = format!("sum({field}) as {field}").into();
820 let sql = self.params.select_sql();
821 if self.params.sql {
822 return JsonValue::from(sql.clone());
823 }
824 let (state, data) = self.query(sql.as_str());
825 match state {
826 true => {
827 if data.len() > 1 {
828 return data;
829 }
830 data[0][field].clone()
831 }
832 false => JsonValue::from(0),
833 }
834 }
835
836 fn avg(&mut self, field: &str) -> JsonValue {
837 self.params.fields[field] = format!("avg({field}) as {field}").into();
838 let sql = self.params.select_sql();
839 if self.params.sql {
840 return JsonValue::from(sql.clone());
841 }
842 let (state, data) = self.query(sql.as_str());
843 if state {
844 if data.len() > 1 {
845 return data;
846 }
847 data[0][field].clone()
848 } else {
849 JsonValue::from(0)
850 }
851 }
852
853 fn select(&mut self) -> JsonValue {
854 let sql = self.params.select_sql();
855 if self.params.sql {
856 return JsonValue::from(sql.clone());
857 }
858 let (state, mut data) = self.query(sql.as_str());
859 match state {
860 true => {
861 for (field, _) in self.params.json.entries() {
862 for item in data.members_mut() {
863 if !item[field].is_empty() {
864 let json = item[field].to_string();
865 item[field] = match json::parse(&json) {
866 Ok(e) => e,
867 Err(_) => JsonValue::from(json),
868 };
869 }
870 }
871 }
872 data.clone()
873 }
874 false => array![],
875 }
876 }
877
878 fn find(&mut self) -> JsonValue {
879 self.params.page = 1;
880 self.params.limit = 1;
881 let sql = self.params.select_sql();
882 if self.params.sql {
883 return JsonValue::from(sql.clone());
884 }
885 let (state, mut data) = self.query(sql.as_str());
886 match state {
887 true => {
888 if data.is_empty() {
889 return object! {};
890 }
891 for (field, _) in self.params.json.entries() {
892 if !data[0][field].is_empty() {
893 let json = data[0][field].to_string();
894 let json = json::parse(&json).unwrap_or(array![]);
895 data[0][field] = json;
896 } else {
897 data[0][field] = array![];
898 }
899 }
900 data[0].clone()
901 }
902 false => {
903 object! {}
904 }
905 }
906 }
907
908 fn value(&mut self, field: &str) -> JsonValue {
909 self.params.fields = object! {};
910 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
911 self.params.page = 1;
912 self.params.limit = 1;
913 let sql = self.params.select_sql();
914 if self.params.sql {
915 return JsonValue::from(sql.clone());
916 }
917 let (state, mut data) = self.query(sql.as_str());
918 match state {
919 true => {
920 for (field, _) in self.params.json.entries() {
921 if !data[0][field].is_empty() {
922 let json = data[0][field].to_string();
923 let json = json::parse(&json).unwrap_or(array![]);
924 data[0][field] = json;
925 } else {
926 data[0][field] = array![];
927 }
928 }
929 data[0][field].clone()
930 }
931 false => {
932 if self.connection.debug {
933 info!("{data:?}");
934 }
935 JsonValue::Null
936 }
937 }
938 }
939
940 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
941 let mut fields = vec![];
942 let mut values = vec![];
943 if !self.params.autoinc && data["id"].is_empty() {
944 data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
945 }
946 for (field, value) in data.entries() {
947 fields.push(field);
948
949 if value.is_string() {
950 values.push(format!("'{}'", value.to_string().replace("'", "''")));
951 continue;
952 } else if value.is_array() {
953 if self.params.json[field].is_empty() {
954 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
955 values.push(format!("'{array}'"));
956 } else {
957 let json = value.to_string();
958 let json = json.replace("'", "''");
959 values.push(format!("'{json}'"));
960 }
961 continue;
962 } else if value.is_object() {
963 if self.params.json[field].is_empty() {
964 values.push(format!("'{value}'"));
965 } else {
966 let json = value.to_string();
967 let json = json.replace("'", "''");
968 values.push(format!("'{json}'"));
969 }
970 continue;
971 } else if value.is_number() || value.is_boolean() || value.is_null() {
972 values.push(format!("{value}"));
973 continue;
974 } else {
975 values.push(format!("'{value}'"));
976 continue;
977 }
978 }
979 let fields = fields.join(",");
980 let values = values.join(",");
981
982 let sql = format!(
983 "INSERT INTO {} ({}) VALUES ({});",
984 self.params.table, fields, values
985 );
986 if self.params.sql {
987 return JsonValue::from(sql.clone());
988 }
989 let (state, ids) = self.execute(sql.as_str());
990
991 match state {
992 true => match self.params.autoinc {
993 true => ids.clone(),
994 false => data["id"].clone(),
995 },
996 false => {
997 let thread_id = format!("{:?}", thread::current().id());
998 error!("添加失败: {thread_id} {ids:?} {sql}");
999 JsonValue::from("")
1000 }
1001 }
1002 }
1003 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1004 let mut fields = String::new();
1005 if !self.params.autoinc && data[0]["id"].is_empty() {
1006 data[0]["id"] = "".into();
1007 }
1008 for (field, _) in data[0].entries() {
1009 fields = format!("{fields},{field}");
1010 }
1011 fields = fields.trim_start_matches(",").parse().unwrap();
1012
1013 let core_count = num_cpus::get();
1014 let mut p = pools::Pool::new(core_count * 4);
1015
1016 let autoinc = self.params.autoinc;
1017 for list in data.members() {
1018 let mut item = list.clone();
1019 let i = br_fields::str::Code::verification_code(3);
1020 p.execute(move |pcindex| {
1021 if !autoinc && item["id"].is_empty() {
1022 let id = format!(
1023 "{:X}{:X}{}",
1024 Local::now().timestamp_nanos_opt().unwrap(),
1025 pcindex,
1026 i
1027 );
1028 item["id"] = id.into();
1029 }
1030 let mut values = "".to_string();
1031 for (_, value) in item.entries() {
1032 if value.is_string() {
1033 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1034 } else if value.is_number() {
1035 values = format!("{values},{value}");
1036 } else if value.is_boolean() {
1037 values = format!("{values},{value}");
1038 continue;
1039 } else {
1040 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1041 }
1042 }
1043 values = format!("({})", values.trim_start_matches(","));
1044 array![item["id"].clone(), values]
1045 });
1046 }
1047 let (ids_list, mut values) = p.insert_all();
1048 values = values.trim_start_matches(",").parse().unwrap();
1049 let sql = format!(
1050 "INSERT INTO {} ({}) VALUES {};",
1051 self.params.table, fields, values
1052 );
1053
1054 if self.params.sql {
1055 return JsonValue::from(sql.clone());
1056 }
1057 let (state, data) = self.execute(sql.as_str());
1058 match state {
1059 true => match autoinc {
1060 true => data,
1061 false => JsonValue::from(ids_list),
1062 },
1063 false => {
1064 error!("insert_all: {data:?}");
1065 array![]
1066 }
1067 }
1068 }
1069 fn update(&mut self, data: JsonValue) -> JsonValue {
1070 let mut values = vec![];
1071 for (field, value) in data.entries() {
1072 if value.is_string() {
1073 values.push(format!(
1074 "{}='{}'",
1075 field,
1076 value.to_string().replace("'", "''")
1077 ));
1078 } else if value.is_number() {
1079 values.push(format!("{field}= {value}"));
1080 } else if value.is_array() {
1081 if self.params.json[field].is_empty() {
1082 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1083 values.push(format!("{field}='{array}'"));
1084 } else {
1085 let json = value.to_string();
1086 let json = json.replace("'", "''");
1087 values.push(format!("{field}='{json}'"));
1088 }
1089 continue;
1090 } else if value.is_object() {
1091 if self.params.json[field].is_empty() {
1092 values.push(format!("{field}='{value}'"));
1093 } else {
1094 if value.is_empty() {
1095 values.push(format!("{field}=''"));
1096 continue;
1097 }
1098 let json = value.to_string();
1099 let json = json.replace("'", "''");
1100 values.push(format!("{field}='{json}'"));
1101 }
1102 continue;
1103 } else if value.is_boolean() {
1104 values.push(format!("{field}= {value}"));
1105 } else {
1106 values.push(format!("{field}=\"{value}\""));
1107 }
1108 }
1109
1110 for (field, value) in self.params.inc_dec.entries() {
1111 values.push(format!("{} = {}", field, value.to_string().clone()));
1112 }
1113
1114 let values = values.join(",");
1115
1116 let sql = format!(
1117 "UPDATE {} SET {} {};",
1118 self.params.table.clone(),
1119 values,
1120 self.params.where_sql()
1121 );
1122 if self.params.sql {
1123 return JsonValue::from(sql.clone());
1124 }
1125 let (state, data) = self.execute(sql.as_str());
1126 if state {
1127 data
1128 } else {
1129 let thread_id = format!("{:?}", thread::current().id());
1130 error!("update: {thread_id} {data:?} {sql}");
1131 0.into()
1132 }
1133 }
1134 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1135 let mut values = vec![];
1136
1137 let mut ids = vec![];
1138 for (field, _) in data[0].entries() {
1139 if field == "id" {
1140 continue;
1141 }
1142 let mut fields = vec![];
1143 for row in data.members() {
1144 let value = row[field].clone();
1145 let id = row["id"].clone();
1146 ids.push(id.clone());
1147 if value.is_string() {
1148 fields.push(format!(
1149 "WHEN '{}' THEN '{}'",
1150 id,
1151 value.to_string().replace("'", "''")
1152 ));
1153 } else if value.is_array() || value.is_object() {
1154 if self.params.json[field].is_empty() {
1155 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1156 } else {
1157 let json = value.to_string();
1158 let json = json.replace("'", "''");
1159 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1160 }
1161 continue;
1162 } else if value.is_number() || value.is_boolean() || value.is_null() {
1163 fields.push(format!("WHEN '{id}' THEN {value}"));
1164 } else {
1165 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1166 }
1167 }
1168 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1169 }
1170 self.where_and("id", "in", ids.into());
1171 for (field, value) in self.params.inc_dec.entries() {
1172 values.push(format!("{} = {}", field, value.to_string().clone()));
1173 }
1174
1175 let values = values.join(",");
1176 let sql = format!(
1177 "UPDATE {} SET {} {} {};",
1178 self.params.table.clone(),
1179 values,
1180 self.params.where_sql(),
1181 self.params.page_limit_sql()
1182 );
1183 if self.params.sql {
1184 return JsonValue::from(sql.clone());
1185 }
1186 let (state, data) = self.execute(sql.as_str());
1187 if state {
1188 data
1189 } else {
1190 error!("update_all: {data:?}");
1191 JsonValue::from(0)
1192 }
1193 }
1194
1195 fn delete(&mut self) -> JsonValue {
1196 let sql = format!(
1197 "delete FROM {} {} {};",
1198 self.params.table.clone(),
1199 self.params.where_sql(),
1200 self.params.page_limit_sql()
1201 );
1202 if self.params.sql {
1203 return JsonValue::from(sql.clone());
1204 }
1205 let (state, data) = self.execute(sql.as_str());
1206 match state {
1207 true => data,
1208 false => {
1209 error!("delete 失败>>> {data:?}");
1210 JsonValue::from(0)
1211 }
1212 }
1213 }
1214
1215 fn transaction(&mut self) -> bool {
1216 let thread_id = format!("{:?}", thread::current().id());
1217
1218 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1219 let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1220 t += 1;
1221 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1222 return true;
1223 }
1224
1225 TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1226 let key = format!("{}{}", self.default, thread_id);
1227
1228 let mut guard = match self.client.get_guard() {
1230 Ok(g) => g,
1231 Err(e) => {
1232 error!("获取事务连接失败: {e}");
1233 return false;
1234 }
1235 };
1236
1237 let conn = guard.conn().clone();
1238
1239 drop(guard);
1241
1242 TR.lock().unwrap().insert(key.clone(), Arc::new(Mutex::new(conn)));
1243
1244 let sql = "START TRANSACTION;".to_string();
1245 let (state, _) = self.execute(sql.as_str());
1246 match state {
1247 true => {
1248 let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1249 let (state, _) = self.execute(sql.as_str());
1250 match state {
1251 true => state,
1252 false => {
1253 TRANS.lock().unwrap().remove(&*thread_id.clone());
1254 TR.lock().unwrap().remove(&key.clone());
1255 state
1256 }
1257 }
1258 }
1259 false => {
1260 TRANS.lock().unwrap().remove(&*thread_id.clone());
1261 TR.lock().unwrap().remove(&key.clone());
1262 state
1263 }
1264 }
1265 }
1266 fn commit(&mut self) -> bool {
1267 let thread_id = format!("{:?}", thread::current().id());
1268 let sql = "COMMIT".to_string();
1269
1270 let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1271 if t > 1 {
1272 t -= 1;
1273 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1274 return true;
1275 }
1276 let (state, data) = self.execute(sql.as_str());
1277 TRANS.lock().unwrap().remove(&thread_id);
1278 let key = format!("{}{}", self.default, thread_id);
1279 TR.lock().unwrap().remove(&*key);
1280 match state {
1281 true => {}
1282 false => {
1283 error!("提交事务失败: {data}");
1284 }
1285 }
1286 state
1287 }
1288
1289 fn rollback(&mut self) -> bool {
1290 let thread_id = format!("{:?}", thread::current().id());
1291 let sql = "ROLLBACK".to_string();
1292
1293 let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1294 if t > 1 {
1295 t -= 1;
1296 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1297 return true;
1298 }
1299 let (state, data) = self.execute(sql.as_str());
1300 TRANS.lock().unwrap().remove(&thread_id);
1301 let key = format!("{}{}", self.default, thread_id);
1302 TR.lock().unwrap().remove(&*key);
1303 match state {
1304 true => {}
1305 false => {
1306 error!("回滚失败: {data}");
1307 }
1308 }
1309 state
1310 }
1311
1312 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1313 let (state, data) = self.query(sql);
1314 match state {
1315 true => Ok(data),
1316 false => Err(data.to_string()),
1317 }
1318 }
1319
1320 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1321 let (state, data) = self.execute(sql);
1322 match state {
1323 true => Ok(data),
1324 false => Err(data.to_string()),
1325 }
1326 }
1327
1328 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1329 self.params.inc_dec[field] = format!("{field} + {num}").into();
1330 self
1331 }
1332
1333 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1334 self.params.inc_dec[field] = format!("{field} - {num}").into();
1335 self
1336 }
1337 fn buildsql(&mut self) -> String {
1338 self.fetch_sql();
1339 let sql = self.select().to_string();
1340 format!("( {} ) {}", sql, self.params.table)
1341 }
1342
1343 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1344 for field in fields {
1345 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1346 }
1347 self
1348 }
1349
1350 fn join(&mut self, main_table: &str, main_fields: &str, right_table: &str, right_fields: &str) -> &mut Self {
1351 let main_table = if main_table.is_empty() {
1352 self.params.table.clone()
1353 } else {
1354 main_table.to_string()
1355 };
1356 self.params.join_table = right_table.to_string();
1357 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1358 self
1359 }
1360
1361 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1362 let main_fields = if main_fields.is_empty() {
1363 "id"
1364 } else {
1365 main_fields
1366 };
1367 let second_fields = if second_fields.is_empty() {
1368 self.params.table.clone()
1369 } else {
1370 second_fields.to_string().clone()
1371 };
1372 let sec_table_name = format!("{}{}", table, "_2");
1373 let second_table = format!("{} {}", table, sec_table_name.clone());
1374 self.params.join_table = sec_table_name.clone();
1375 self.params.join.push(format!(
1376 " INNER JOIN {} ON {}.{} = {}.{}",
1377 second_table, self.params.table, main_fields, sec_table_name, second_fields
1378 ));
1379 self
1380 }
1381}