1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::pools;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use std::sync::Mutex;
10use std::sync::Arc;
11use std::thread;
12use br_pgsql::connect::Connect;
13use br_pgsql::pools::Pools;
14
15lazy_static! {
16 static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
17 static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
18}
19#[derive(Clone)]
20pub struct Pgsql {
21 pub connection: Connection,
23 pub default: String,
25 pub params: Params,
26 pub client: Pools,
27}
28
29impl Pgsql {
30 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
31 let port = connection.hostport.parse::<i32>().map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
32
33 let cp_connection = connection.clone();
34 let config = object! {
35 debug: cp_connection.debug,
36 username: cp_connection.username,
37 userpass: cp_connection.userpass,
38 database: cp_connection.database,
39 hostname: cp_connection.hostname,
40 hostport: port,
41 charset: cp_connection.charset.str(),
42 };
43 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
44
45 let pools = pgsql.pools()?;
46 Ok(Self {
47 connection,
48 default: default.clone(),
49 params: Params::default("pgsql"),
50 client: pools,
51 })
52 }
53
54 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
55 let thread_id = format!("{:?}", thread::current().id());
56 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
60 let key = format!("{}{}", self.default, thread_id);
62 let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
63
64 let mut t = db.lock().unwrap();
65 match t.query(sql) {
66 Ok(e) => {
67 if self.connection.debug {
68 info!("查询成功: {} {}", thread_id.clone(), sql);
69 }
70 (true, e.rows)
71 }
72 Err(e) => {
73 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
74 (false, JsonValue::from(e.to_string()))
75 }
76 }
77 } else {
78 let mut guard = match self.client.get_guard() {
80 Ok(g) => g,
81 Err(e) => {
82 error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
83 return (false, JsonValue::from(e.to_string()));
84 }
85 };
86
87 let res = guard.conn().query(sql);
88 match res {
89 Ok(e) => {
90 if self.connection.debug {
91 info!("查询成功: {} {}", thread_id.clone(), sql);
92 }
93 (true, e.rows)
94 }
95 Err(e) => {
96 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
97 (false, JsonValue::from(e.to_string()))
98 }
99 }
100 }
102 }
103 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
104 let thread_id = format!("{:?}", thread::current().id());
105
106 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
107 let key = format!("{}{}", self.default, thread_id);
108 let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
109 let mut t = db.lock().unwrap();
110 match t.execute(sql) {
111 Ok(e) => {
112 if self.connection.debug {
113 info!("提交成功: {} {}", thread_id.clone(), sql);
114 }
115 if sql.contains("INSERT") {
116 (true, e.rows)
117 } else {
118 (true, e.affect_count.into())
119 }
120 }
121 Err(e) => {
122 error!("事务提交失败: {thread_id} {e} {sql}");
123 (false, JsonValue::from(e.to_string()))
124 }
125 }
126 } else {
127 let mut guard = match self.client.get_guard() {
129 Ok(g) => g,
130 Err(e) => {
131 error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
132 return (false, JsonValue::from(e.to_string()));
133 }
134 };
135
136 let res = guard.conn().execute(sql);
137 match res {
138 Ok(e) => {
139 if self.connection.debug {
140 info!("提交成功: {} {}", thread_id.clone(), sql);
141 }
142 if sql.contains("INSERT") {
143 (true, e.rows)
144 } else {
145 (true, e.affect_count.into())
146 }
147 }
148 Err(e) => {
149 error!("非事务提交失败: {thread_id} {e} {sql}");
150 (false, JsonValue::from(e.to_string()))
151 }
152 }
153 }
155 }
156}
157
158impl DbMode for Pgsql {
159 fn database_tables(&mut self) -> JsonValue {
160 let sql = "SHOW TABLES".to_string();
161 match self.sql(sql.as_str()) {
162 Ok(e) => {
163 let mut list = vec![];
164 for item in e.members() {
165 for (_, value) in item.entries() {
166 list.push(value.clone());
167 }
168 }
169 list.into()
170 }
171 Err(_) => {
172 array![]
173 }
174 }
175 }
176
177 fn database_create(&mut self, name: &str) -> bool {
178 let sql = format!("CREATE DATABASE {name}");
179
180 let (state, data) = self.execute(sql.as_str());
181 match state {
182 true => data.as_bool().unwrap(),
183 false => {
184 error!("创建数据库失败: {data:?}");
185 false
186 }
187 }
188 }
189}
190
191impl Mode for Pgsql {
192 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
193 let mut sql = String::new();
194 let mut comments = vec![];
195
196 if !options.table_unique.is_empty() {
198 let unique = format!(
199 "CREATE UNIQUE INDEX {01}_unique_{} ON {} ({});",
200 options.table_unique.join("_"),
201 options.table_name,
202 options.table_unique.join(",")
203 );
204 comments.push(unique);
205 }
206
207 for row in options.table_index.iter() {
209 let index = format!(
210 "CREATE INDEX {01}_index_{} ON {} ({})",
211 row.join("_"),
212 options.table_name,
213 row.join(",")
214 );
215 comments.push(index);
216 }
217
218 for (name, field) in options.table_fields.entries_mut() {
219 field["table_name"] = options.table_name.clone().into();
220 let row = br_fields::field("pgsql", name, field.clone());
221 let rows = row.split("comment").collect::<Vec<&str>>();
222 comments.push(format!(
223 "COMMENT ON COLUMN {}.{} IS {};",
224 options.table_name, name, rows[1]
225 ));
226 sql = format!("{} {},\r\n", sql, rows[0]);
227 }
228
229 let primary_key = format!(
230 "CONSTRAINT {}_{} PRIMARY KEY ({})",
231 options.table_name, options.table_key, options.table_key
232 );
233 let sql = format!(
234 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
235 options.table_name,
236 sql.trim_end_matches(",\r\n"),
237 primary_key
238 );
239 comments.insert(0, sql);
240
241 for (_name, field) in options.table_fields.entries() {
242 field["mode"].as_str().unwrap();
243 {}
244 }
245
246 if self.params.sql {
247 let info = comments.join("\r\n");
248 return JsonValue::from(info);
249 }
250 for comment in comments {
251 let (state, _) = self.execute(comment.as_str());
252 match state {
253 true => {}
254 false => {
255 return JsonValue::from(state);
256 }
257 }
258 }
259 JsonValue::from(true)
260 }
261
262 fn table_update(&mut self, options: TableOptions) -> JsonValue {
263 let fields_list = self.table_info(&options.table_name);
264 let mut put = vec![];
265 let mut add = vec![];
266 let mut del = vec![];
267 let mut comments = vec![];
268
269 for (key, _) in fields_list.entries() {
270 if options.table_fields[key].is_empty() {
271 del.push(key);
272 }
273 }
274 for (name, field) in options.table_fields.entries() {
275 if !fields_list[name].is_empty() {
276 let old_comment = fields_list[name]["comment"].to_string();
277 let new_comment = br_fields::field("pgsql", name, field.clone());
278 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
279 let new_comment_text = new_comment[1].trim().trim_start_matches("'").trim_end_matches("'");
280 if old_comment == new_comment_text {
281 continue;
282 }
283 put.push(name);
284 } else {
285 add.push(name);
286 }
287 }
288
289 for name in add.iter() {
290 let name = name.to_string();
291 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
292 let rows = row.split("comment").collect::<Vec<&str>>();
293 comments.push(format!(
294 r#"ALTER TABLE "{}" add {};"#,
295 options.table_name, rows[0]
296 ));
297 comments.push(format!(
298 "COMMENT ON COLUMN {}.{} IS {};",
299 options.table_name, name, rows[1]
300 ));
301 }
302 for name in del.iter() {
303 comments.push(format!(
304 "ALTER TABLE {} DROP {};\r\n",
305 options.table_name, name
306 ));
307 }
308 for name in put.iter() {
309 let name = name.to_string();
310 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
311 let rows = row.split("comment").collect::<Vec<&str>>();
312
313 let sql = rows[0].split(" ").collect::<Vec<&str>>();
314
315 if sql[1].contains("BOOLEAN") {
316 let text = format!(
317 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
318 options.table_name, name
319 );
320 comments.push(text.clone());
321 let text = format!(
322 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
323 options.table_name, name, sql[1]
324 );
325 comments.push(text.clone());
326 } else {
327 let text = format!(
328 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
329 options.table_name, name, sql[1]
330 );
331 comments.push(text.clone());
332 };
333
334 if sql.len() > 3 && !sql[3].is_empty() {
335 let text = format!(
336 "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT '{}';\r\n",
337 options.table_name, name, sql[3]
338 );
339 comments.push(text.clone());
340 }
341
342 comments.push(format!(
343 "COMMENT ON COLUMN {}.{} IS {};",
344 options.table_name, name, rows[1]
345 ));
346 }
347
348 let mut unique_new = vec![];
349 let mut index_new = vec![];
350 let mut primary_key = vec![];
351 let (_, index_list) = self.query(
352 format!(
353 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
354 options.table_name
355 ).as_str(),
356 );
357 for item in index_list.members() {
358 let key_name = item["indexname"].as_str().unwrap();
359 let indexdef = item["indexdef"].to_string();
360
361 if indexdef.contains(
362 format!(
363 "CREATE UNIQUE INDEX {}_{} ON",
364 options.table_name, options.table_key
365 ).as_str(),
366 ) {
367 primary_key.push(key_name.to_string());
368 continue;
369 }
370 if indexdef.contains("CREATE UNIQUE INDEX") {
371 unique_new.push(key_name.to_string());
372 continue;
373 }
374 if indexdef.contains("CREATE INDEX") {
375 index_new.push(key_name.to_string());
376 continue;
377 }
378 }
379
380 if !options.table_unique.is_empty() {
381 let name = format!(
382 "{}_unique_{}",
383 options.table_name,
384 options.table_unique.join("_")
385 );
386 let unique = format!(
387 "CREATE UNIQUE INDEX {} ON {} ({});",
388 name,
389 options.table_name,
390 options.table_unique.join(",")
391 );
392 if !unique_new.contains(&name) {
393 comments.push(unique);
394 } else {
395 unique_new.retain(|x| *x != name);
396 }
397 }
398
399 for row in options.table_index.iter() {
401 let name = format!("{}_index_{}", options.table_name, row.join("_"));
402 let index = format!(
403 "CREATE INDEX {} ON {} ({})",
404 name,
405 options.table_name,
406 row.join(",")
407 );
408 if !index_new.contains(&name) {
409 comments.push(index);
410 } else {
411 index_new.retain(|x| *x != name);
412 }
413 }
414
415 for item in unique_new {
416 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
417 }
418 for item in index_new {
419 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
420 }
421
422 if self.params.sql {
423 return JsonValue::from(comments.join(""));
424 }
425
426 if comments.is_empty() {
427 return JsonValue::from(-1);
428 }
429
430 for item in comments.iter() {
431 let (state, res) = self.execute(item.as_str());
432 match state {
433 true => {}
434 false => {
435 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
436 return JsonValue::from(0);
437 }
438 }
439 }
440 JsonValue::from(1)
441 }
442
443 fn table_info(&mut self, table: &str) -> JsonValue {
444 let sql = format!(
445 "SELECT COL.COLUMN_NAME,
446 COL.DATA_TYPE,
447 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
448 LEFT JOIN
449 pg_catalog.pg_description DESCRIPTION
450 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
451 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
452 let (state, data) = self.query(sql.as_str());
453 let mut list = object! {};
454 if state {
455 for item in data.members() {
456 let mut row = object! {};
457 row["field"] = item["column_name"].clone();
458 row["comment"] = item["comment"].clone();
459 row["type"] = item["data_type"].clone();
460 list[row["field"].as_str().unwrap()] = row.clone();
461 }
462 list
463 } else {
464 list
465 }
466 }
467
468 fn table_is_exist(&mut self, name: &str) -> bool {
469 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
470 let (state, data) = self.query(sql.as_str());
471 match state {
472 true => {
473 for item in data.members() {
474 if item.has_key("exists") {
475 return item["exists"].as_bool().unwrap();
476 }
477 }
478 false
479 }
480 false => false,
481 }
482 }
483
484 fn table(&mut self, name: &str) -> &mut Pgsql {
485 self.params = Params::default(self.connection.mode.str().as_str());
486 self.params.table = format!("{}{}", self.connection.prefix, name);
487 self.params.join_table = self.params.table.clone();
488 self
489 }
490
491 fn change_table(&mut self, name: &str) -> &mut Self {
492 self.params.join_table = name.to_string();
493 self
494 }
495
496 fn autoinc(&mut self) -> &mut Self {
497 self.params.autoinc = true;
498 self
499 }
500
501 fn fetch_sql(&mut self) -> &mut Self {
502 self.params.sql = true;
503 self
504 }
505
506 fn order(&mut self, field: &str, by: bool) -> &mut Self {
507 self.params.order[field] = {
508 if by {
509 "DESC"
510 } else {
511 "ASC"
512 }
513 }.into();
514 self
515 }
516
517 fn group(&mut self, field: &str) -> &mut Self {
518 let fields: Vec<&str> = field.split(",").collect();
519 for field in fields.iter() {
520 let field = field.to_string();
521 self.params.group[field.as_str()] = field.clone().into();
522 self.params.fields[field.as_str()] = field.clone().into();
523 }
524 self
525 }
526
527 fn distinct(&mut self) -> &mut Self {
528 self.params.distinct = true;
529 self
530 }
531
532 fn json(&mut self, field: &str) -> &mut Self {
533 let list: Vec<&str> = field.split(",").collect();
534 for item in list.iter() {
535 self.params.json[item.to_string().as_str()] = item.to_string().into();
536 }
537 self
538 }
539
540 fn location(&mut self, field: &str) -> &mut Self {
541 let list: Vec<&str> = field.split(",").collect();
542 for item in list.iter() {
543 self.params.location[item.to_string().as_str()] = item.to_string().into();
544 }
545 self
546 }
547
548 fn field(&mut self, field: &str) -> &mut Self {
549 let list: Vec<&str> = field.split(",").collect();
550 let join_table = if self.params.join_table.is_empty() {
551 self.params.table.clone()
552 } else {
553 self.params.join_table.clone()
554 };
555 for item in list.iter() {
556 if item.contains(" as ") {
557 let text = item.split(" as ").collect::<Vec<&str>>();
558 if text[0].contains("count(") {
559 self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
560 } else {
561 self.params.fields[item.to_string().as_str()] = format!("{}.{} as {}", join_table, text[0], text[1]).into();
562 }
563 } else {
564 self.params.fields[item.to_string().as_str()] = format!("{join_table}.{item}").into();
565 }
566 }
567 self
568 }
569
570 fn hidden(&mut self, name: &str) -> &mut Self {
571 let hidden: Vec<&str> = name.split(",").collect();
572
573 let fields_list = self.table_info(self.params.clone().table.as_str());
574 let mut data = array![];
575 for item in fields_list.members() {
576 data.push(object! {
577 "name":item["field"].as_str().unwrap()
578 }).unwrap();
579 }
580
581 for item in data.members() {
582 let name = item["name"].as_str().unwrap();
583 if !hidden.contains(&name) {
584 self.params.fields[name] = name.into();
585 }
586 }
587 self
588 }
589
590 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
591 let join_table = if self.params.join_table.is_empty() {
592 self.params.table.clone()
593 } else {
594 self.params.join_table.clone()
595 };
596 if value.is_boolean() {
597 if value.as_bool().unwrap() {
598 value = 1.into();
599 } else {
600 value = 0.into();
601 }
602 }
603 match compare {
604 "between" => {
605 self.params.where_and.push(format!(
606 "{}.{} between '{}' AND '{}'",
607 join_table, field, value[0], value[1]
608 ));
609 }
610 "set" => {
611 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
612 let mut wheredata = vec![];
613 for item in list.iter() {
614 wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
615 }
616 self.params.where_and.push(format!("({})", wheredata.join(" or ")));
617 }
618 "notin" => {
619 let mut text = String::new();
620 for item in value.members() {
621 text = format!("{text},'{item}'");
622 }
623 text = text.trim_start_matches(",").into();
624 self.params.where_and.push(format!("{join_table}.{field} not in ({text})"));
625 }
626 "is" => {
627 self.params.where_and.push(format!("{join_table}.{field} is {value}"));
628 }
629 "isnot" => {
630 self.params.where_and.push(format!("{join_table}.{field} is not {value}"));
631 }
632 "notlike" => {
633 self.params.where_and.push(format!("{join_table}.{field} not like '{value}'"));
634 }
635 "in" => {
636 let mut text = String::new();
637 if value.is_array() {
638 for item in value.members() {
639 text = format!("{text},'{item}'");
640 }
641 } else if value.is_null() {
642 text = format!("{text},null");
643 } else {
644 let value = value.as_str().unwrap();
645
646 let value: Vec<&str> = value.split(",").collect();
647 for item in value.iter() {
648 text = format!("{text},'{item}'");
649 }
650 }
651 text = text.trim_start_matches(",").into();
652
653 self.params.where_and.push(format!("{join_table}.{field} {compare} ({text})"));
654 }
655 _ => {
656 self.params.where_and.push(format!("{join_table}.{field} {compare} '{value}'"));
657 }
658 }
659 self
660 }
661
662 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
663 let join_table = if self.params.join_table.is_empty() {
664 self.params.table.clone()
665 } else {
666 self.params.join_table.clone()
667 };
668
669 if value.is_boolean() {
670 if value.as_bool().unwrap() {
671 value = 1.into();
672 } else {
673 value = 0.into();
674 }
675 }
676
677 match compare {
678 "between" => {
679 self.params.where_or.push(format!(
680 "{}.{} between '{}' AND '{}'",
681 join_table, field, value[0], value[1]
682 ));
683 }
684 "set" => {
685 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
686 let mut wheredata = vec![];
687 for item in list.iter() {
688 wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
689 }
690 self.params.where_or.push(format!("({})", wheredata.join(" or ")));
691 }
692 "notin" => {
693 let mut text = String::new();
694 for item in value.members() {
695 text = format!("{text},'{item}'");
696 }
697 text = text.trim_start_matches(",").into();
698 self.params.where_or.push(format!("{join_table}.{field} not in ({text})"));
699 }
700 "is" => {
701 self.params.where_or.push(format!("{join_table}.{field} is {value}"));
702 }
703 "isnot" => {
704 self.params.where_or.push(format!("{join_table}.{field} is not {value}"));
705 }
706 "in" => {
707 let mut text = String::new();
708 if value.is_array() {
709 for item in value.members() {
710 text = format!("{text},'{item}'");
711 }
712 } else {
713 let value = value.as_str().unwrap();
714 let value: Vec<&str> = value.split(",").collect();
715 for item in value.iter() {
716 text = format!("{text},'{item}'");
717 }
718 }
719 text = text.trim_start_matches(",").into();
720 self.params.where_or.push(format!("{join_table}.{field} {compare} ({text})"));
721 }
722 _ => {
723 self.params.where_or.push(format!("{join_table}.{field} {compare} '{value}'"));
724 }
725 }
726 self
727 }
728
729 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
730 self.params.where_column = format!(
731 "{}.{} {} {}.{}",
732 self.params.table, field_a, compare, self.params.table, field_b
733 );
734 self
735 }
736
737 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
738 self.params.update_column.push(format!("{field_a} = {compare}"));
739 self
740 }
741
742 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
743 self.params.page = page;
744 self.params.limit = limit;
745 self
746 }
747
748 fn column(&mut self, field: &str) -> JsonValue {
749 self.field(field);
750 let sql = self.params.select_sql();
751
752 if self.params.sql {
753 return JsonValue::from(sql);
754 }
755 let (state, data) = self.query(sql.as_str());
756 match state {
757 true => {
758 let mut list = array![];
759 for item in data.members() {
760 if self.params.json[field].is_empty() {
761 list.push(item[field].clone()).unwrap();
762 } else {
763 let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
764 list.push(data).unwrap();
765 }
766 }
767 list
768 }
769 false => {
770 array![]
771 }
772 }
773 }
774
775 fn count(&mut self) -> JsonValue {
776 self.params.fields["count"] = "count(*) as count".to_string().into();
777 let sql = self.params.select_sql();
778 if self.params.sql {
779 return JsonValue::from(sql.clone());
780 }
781 let (state, data) = self.query(sql.as_str());
782 if state {
783 data[0]["count"].clone()
784 } else {
785 JsonValue::from(0)
786 }
787 }
788
789 fn max(&mut self, field: &str) -> JsonValue {
790 self.params.fields[field] = format!("max({field}) as {field}").into();
791 let sql = self.params.select_sql();
792 if self.params.sql {
793 return JsonValue::from(sql.clone());
794 }
795 let (state, data) = self.query(sql.as_str());
796 if state {
797 if data.len() > 1 {
798 return data.clone();
799 }
800 data[0][field].clone()
801 } else {
802 JsonValue::from(0)
803 }
804 }
805
806 fn min(&mut self, field: &str) -> JsonValue {
807 self.params.fields[field] = format!("min({field}) as {field}").into();
808 let sql = self.params.select_sql();
809 if self.params.sql {
810 return JsonValue::from(sql.clone());
811 }
812 let (state, data) = self.query(sql.as_str());
813 if state {
814 if data.len() > 1 {
815 return data;
816 }
817 data[0][field].clone()
818 } else {
819 JsonValue::from(0)
820 }
821 }
822
823 fn sum(&mut self, field: &str) -> JsonValue {
824 self.params.fields[field] = format!("sum({field}) as {field}").into();
825 let sql = self.params.select_sql();
826 if self.params.sql {
827 return JsonValue::from(sql.clone());
828 }
829 let (state, data) = self.query(sql.as_str());
830 match state {
831 true => {
832 if data.len() > 1 {
833 return data;
834 }
835 data[0][field].clone()
836 }
837 false => JsonValue::from(0),
838 }
839 }
840
841 fn avg(&mut self, field: &str) -> JsonValue {
842 self.params.fields[field] = format!("avg({field}) as {field}").into();
843 let sql = self.params.select_sql();
844 if self.params.sql {
845 return JsonValue::from(sql.clone());
846 }
847 let (state, data) = self.query(sql.as_str());
848 if state {
849 if data.len() > 1 {
850 return data;
851 }
852 data[0][field].clone()
853 } else {
854 JsonValue::from(0)
855 }
856 }
857
858 fn select(&mut self) -> JsonValue {
859 let sql = self.params.select_sql();
860 if self.params.sql {
861 return JsonValue::from(sql.clone());
862 }
863 let (state, mut data) = self.query(sql.as_str());
864 match state {
865 true => {
866 for (field, _) in self.params.json.entries() {
867 for item in data.members_mut() {
868 if !item[field].is_empty() {
869 let json = item[field].to_string();
870 item[field] = match json::parse(&json) {
871 Ok(e) => e,
872 Err(_) => JsonValue::from(json),
873 };
874 }
875 }
876 }
877 data.clone()
878 }
879 false => array![],
880 }
881 }
882
883 fn find(&mut self) -> JsonValue {
884 self.params.page = 1;
885 self.params.limit = 1;
886 let sql = self.params.select_sql();
887 if self.params.sql {
888 return JsonValue::from(sql.clone());
889 }
890 let (state, mut data) = self.query(sql.as_str());
891 match state {
892 true => {
893 if data.is_empty() {
894 return object! {};
895 }
896 for (field, _) in self.params.json.entries() {
897 if !data[0][field].is_empty() {
898 let json = data[0][field].to_string();
899 let json = json::parse(&json).unwrap_or(array![]);
900 data[0][field] = json;
901 } else {
902 data[0][field] = array![];
903 }
904 }
905 data[0].clone()
906 }
907 false => {
908 object! {}
909 }
910 }
911 }
912
913 fn value(&mut self, field: &str) -> JsonValue {
914 self.params.fields = object! {};
915 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
916 self.params.page = 1;
917 self.params.limit = 1;
918 let sql = self.params.select_sql();
919 if self.params.sql {
920 return JsonValue::from(sql.clone());
921 }
922 let (state, mut data) = self.query(sql.as_str());
923 match state {
924 true => {
925 for (field, _) in self.params.json.entries() {
926 if !data[0][field].is_empty() {
927 let json = data[0][field].to_string();
928 let json = json::parse(&json).unwrap_or(array![]);
929 data[0][field] = json;
930 } else {
931 data[0][field] = array![];
932 }
933 }
934 data[0][field].clone()
935 }
936 false => {
937 if self.connection.debug {
938 info!("{data:?}");
939 }
940 JsonValue::Null
941 }
942 }
943 }
944
945 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
946 let mut fields = vec![];
947 let mut values = vec![];
948 if !self.params.autoinc && data["id"].is_empty() {
949 data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
950 }
951 for (field, value) in data.entries() {
952 fields.push(field);
953
954 if value.is_string() {
955 values.push(format!("'{}'", value.to_string().replace("'", "''")));
956 continue;
957 } else if value.is_array() {
958 if self.params.json[field].is_empty() {
959 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
960 values.push(format!("'{array}'"));
961 } else {
962 let json = value.to_string();
963 let json = json.replace("'", "''");
964 values.push(format!("'{json}'"));
965 }
966 continue;
967 } else if value.is_object() {
968 if self.params.json[field].is_empty() {
969 values.push(format!("'{value}'"));
970 } else {
971 let json = value.to_string();
972 let json = json.replace("'", "''");
973 values.push(format!("'{json}'"));
974 }
975 continue;
976 } else if value.is_number() || value.is_boolean() || value.is_null() {
977 values.push(format!("{value}"));
978 continue;
979 } else {
980 values.push(format!("'{value}'"));
981 continue;
982 }
983 }
984 let fields = fields.join(",");
985 let values = values.join(",");
986
987 let sql = format!(
988 "INSERT INTO {} ({}) VALUES ({});",
989 self.params.table, fields, values
990 );
991 if self.params.sql {
992 return JsonValue::from(sql.clone());
993 }
994 let (state, ids) = self.execute(sql.as_str());
995
996 match state {
997 true => match self.params.autoinc {
998 true => ids.clone(),
999 false => data["id"].clone(),
1000 },
1001 false => {
1002 let thread_id = format!("{:?}", thread::current().id());
1003 error!("添加失败: {thread_id} {ids:?} {sql}");
1004 JsonValue::from("")
1005 }
1006 }
1007 }
1008 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1009 let mut fields = String::new();
1010 if !self.params.autoinc && data[0]["id"].is_empty() {
1011 data[0]["id"] = "".into();
1012 }
1013 for (field, _) in data[0].entries() {
1014 fields = format!("{fields},{field}");
1015 }
1016 fields = fields.trim_start_matches(",").parse().unwrap();
1017
1018 let core_count = num_cpus::get();
1019 let mut p = pools::Pool::new(core_count * 4);
1020
1021 let autoinc = self.params.autoinc;
1022 for list in data.members() {
1023 let mut item = list.clone();
1024 let i = br_fields::str::Code::verification_code(3);
1025 p.execute(move |pcindex| {
1026 if !autoinc && item["id"].is_empty() {
1027 let id = format!(
1028 "{:X}{:X}{}",
1029 Local::now().timestamp_nanos_opt().unwrap(),
1030 pcindex,
1031 i
1032 );
1033 item["id"] = id.into();
1034 }
1035 let mut values = "".to_string();
1036 for (_, value) in item.entries() {
1037 if value.is_string() {
1038 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1039 } else if value.is_number() {
1040 values = format!("{values},{value}");
1041 } else if value.is_boolean() {
1042 values = format!("{values},{value}");
1043 continue;
1044 } else {
1045 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1046 }
1047 }
1048 values = format!("({})", values.trim_start_matches(","));
1049 array![item["id"].clone(), values]
1050 });
1051 }
1052 let (ids_list, mut values) = p.insert_all();
1053 values = values.trim_start_matches(",").parse().unwrap();
1054 let sql = format!(
1055 "INSERT INTO {} ({}) VALUES {};",
1056 self.params.table, fields, values
1057 );
1058
1059 if self.params.sql {
1060 return JsonValue::from(sql.clone());
1061 }
1062 let (state, data) = self.execute(sql.as_str());
1063 match state {
1064 true => match autoinc {
1065 true => data,
1066 false => JsonValue::from(ids_list),
1067 },
1068 false => {
1069 error!("insert_all: {data:?}");
1070 array![]
1071 }
1072 }
1073 }
1074 fn update(&mut self, data: JsonValue) -> JsonValue {
1075 let mut values = vec![];
1076 for (field, value) in data.entries() {
1077 if value.is_string() {
1078 values.push(format!(
1079 "{}='{}'",
1080 field,
1081 value.to_string().replace("'", "''")
1082 ));
1083 } else if value.is_number() {
1084 values.push(format!("{field}= {value}"));
1085 } else if value.is_array() {
1086 if self.params.json[field].is_empty() {
1087 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1088 values.push(format!("{field}='{array}'"));
1089 } else {
1090 let json = value.to_string();
1091 let json = json.replace("'", "''");
1092 values.push(format!("{field}='{json}'"));
1093 }
1094 continue;
1095 } else if value.is_object() {
1096 if self.params.json[field].is_empty() {
1097 values.push(format!("{field}='{value}'"));
1098 } else {
1099 if value.is_empty() {
1100 values.push(format!("{field}=''"));
1101 continue;
1102 }
1103 let json = value.to_string();
1104 let json = json.replace("'", "''");
1105 values.push(format!("{field}='{json}'"));
1106 }
1107 continue;
1108 } else if value.is_boolean() {
1109 values.push(format!("{field}= {value}"));
1110 } else {
1111 values.push(format!("{field}=\"{value}\""));
1112 }
1113 }
1114
1115 for (field, value) in self.params.inc_dec.entries() {
1116 values.push(format!("{} = {}", field, value.to_string().clone()));
1117 }
1118 if !self.params.update_column.is_empty() {
1119 values.extend(self.params.update_column.clone());
1120 }
1121 let values = values.join(",");
1122
1123 let sql = format!(
1124 "UPDATE {} SET {} {};",
1125 self.params.table.clone(),
1126 values,
1127 self.params.where_sql()
1128 );
1129 if self.params.sql {
1130 return JsonValue::from(sql.clone());
1131 }
1132 let (state, data) = self.execute(sql.as_str());
1133 if state {
1134 data
1135 } else {
1136 let thread_id = format!("{:?}", thread::current().id());
1137 error!("update: {thread_id} {data:?} {sql}");
1138 0.into()
1139 }
1140 }
1141 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1142 let mut values = vec![];
1143
1144 let mut ids = vec![];
1145 for (field, _) in data[0].entries() {
1146 if field == "id" {
1147 continue;
1148 }
1149 let mut fields = vec![];
1150 for row in data.members() {
1151 let value = row[field].clone();
1152 let id = row["id"].clone();
1153 ids.push(id.clone());
1154 if value.is_string() {
1155 fields.push(format!(
1156 "WHEN '{}' THEN '{}'",
1157 id,
1158 value.to_string().replace("'", "''")
1159 ));
1160 } else if value.is_array() || value.is_object() {
1161 if self.params.json[field].is_empty() {
1162 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1163 } else {
1164 let json = value.to_string();
1165 let json = json.replace("'", "''");
1166 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1167 }
1168 continue;
1169 } else if value.is_number() || value.is_boolean() || value.is_null() {
1170 fields.push(format!("WHEN '{id}' THEN {value}"));
1171 } else {
1172 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1173 }
1174 }
1175 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1176 }
1177 self.where_and("id", "in", ids.into());
1178 for (field, value) in self.params.inc_dec.entries() {
1179 values.push(format!("{} = {}", field, value.to_string().clone()));
1180 }
1181
1182 let values = values.join(",");
1183 let sql = format!(
1184 "UPDATE {} SET {} {} {};",
1185 self.params.table.clone(),
1186 values,
1187 self.params.where_sql(),
1188 self.params.page_limit_sql()
1189 );
1190 if self.params.sql {
1191 return JsonValue::from(sql.clone());
1192 }
1193 let (state, data) = self.execute(sql.as_str());
1194 if state {
1195 data
1196 } else {
1197 error!("update_all: {data:?}");
1198 JsonValue::from(0)
1199 }
1200 }
1201
1202 fn delete(&mut self) -> JsonValue {
1203 let sql = format!(
1204 "delete FROM {} {} {};",
1205 self.params.table.clone(),
1206 self.params.where_sql(),
1207 self.params.page_limit_sql()
1208 );
1209 if self.params.sql {
1210 return JsonValue::from(sql.clone());
1211 }
1212 let (state, data) = self.execute(sql.as_str());
1213 match state {
1214 true => data,
1215 false => {
1216 error!("delete 失败>>> {data:?}");
1217 JsonValue::from(0)
1218 }
1219 }
1220 }
1221
1222 fn transaction(&mut self) -> bool {
1223 let thread_id = format!("{:?}", thread::current().id());
1224
1225 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1226 let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1227 t += 1;
1228 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1229 return true;
1230 }
1231
1232 TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1233 let key = format!("{}{}", self.default, thread_id);
1234
1235 let mut guard = match self.client.get_guard() {
1237 Ok(g) => g,
1238 Err(e) => {
1239 error!("获取事务连接失败: {e}");
1240 return false;
1241 }
1242 };
1243
1244 let conn = guard.conn().clone();
1245
1246 drop(guard);
1248
1249 TR.lock().unwrap().insert(key.clone(), Arc::new(Mutex::new(conn)));
1250
1251 let sql = "START TRANSACTION;".to_string();
1252 let (state, _) = self.execute(sql.as_str());
1253 match state {
1254 true => {
1255 let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1256 let (state, _) = self.execute(sql.as_str());
1257 match state {
1258 true => state,
1259 false => {
1260 TRANS.lock().unwrap().remove(&*thread_id.clone());
1261 TR.lock().unwrap().remove(&key.clone());
1262 state
1263 }
1264 }
1265 }
1266 false => {
1267 TRANS.lock().unwrap().remove(&*thread_id.clone());
1268 TR.lock().unwrap().remove(&key.clone());
1269 state
1270 }
1271 }
1272 }
1273 fn commit(&mut self) -> bool {
1274 let thread_id = format!("{:?}", thread::current().id());
1275 let sql = "COMMIT".to_string();
1276
1277 let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1278 if t > 1 {
1279 t -= 1;
1280 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1281 return true;
1282 }
1283 let (state, data) = self.execute(sql.as_str());
1284 TRANS.lock().unwrap().remove(&thread_id);
1285 let key = format!("{}{}", self.default, thread_id);
1286 TR.lock().unwrap().remove(&*key);
1287 match state {
1288 true => {}
1289 false => {
1290 error!("提交事务失败: {data}");
1291 }
1292 }
1293 state
1294 }
1295
1296 fn rollback(&mut self) -> bool {
1297 let thread_id = format!("{:?}", thread::current().id());
1298 let sql = "ROLLBACK".to_string();
1299
1300 let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1301 if t > 1 {
1302 t -= 1;
1303 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1304 return true;
1305 }
1306 let (state, data) = self.execute(sql.as_str());
1307 TRANS.lock().unwrap().remove(&thread_id);
1308 let key = format!("{}{}", self.default, thread_id);
1309 TR.lock().unwrap().remove(&*key);
1310 match state {
1311 true => {}
1312 false => {
1313 error!("回滚失败: {data}");
1314 }
1315 }
1316 state
1317 }
1318
1319 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1320 let (state, data) = self.query(sql);
1321 match state {
1322 true => Ok(data),
1323 false => Err(data.to_string()),
1324 }
1325 }
1326
1327 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1328 let (state, data) = self.execute(sql);
1329 match state {
1330 true => Ok(data),
1331 false => Err(data.to_string()),
1332 }
1333 }
1334
1335 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1336 self.params.inc_dec[field] = format!("{field} + {num}").into();
1337 self
1338 }
1339
1340 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1341 self.params.inc_dec[field] = format!("{field} - {num}").into();
1342 self
1343 }
1344 fn buildsql(&mut self) -> String {
1345 self.fetch_sql();
1346 let sql = self.select().to_string();
1347 format!("( {} ) {}", sql, self.params.table)
1348 }
1349
1350 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1351 for field in fields {
1352 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1353 }
1354 self
1355 }
1356
1357 fn join(&mut self, main_table: &str, main_fields: &str, right_table: &str, right_fields: &str) -> &mut Self {
1358 let main_table = if main_table.is_empty() {
1359 self.params.table.clone()
1360 } else {
1361 main_table.to_string()
1362 };
1363 self.params.join_table = right_table.to_string();
1364 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1365 self
1366 }
1367
1368 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1369 let main_fields = if main_fields.is_empty() {
1370 "id"
1371 } else {
1372 main_fields
1373 };
1374 let second_fields = if second_fields.is_empty() {
1375 self.params.table.clone()
1376 } else {
1377 second_fields.to_string().clone()
1378 };
1379 let sec_table_name = format!("{}{}", table, "_2");
1380 let second_table = format!("{} {}", table, sec_table_name.clone());
1381 self.params.join_table = sec_table_name.clone();
1382 self.params.join.push(format!(
1383 " INNER JOIN {} ON {}.{} = {}.{}",
1384 second_table, self.params.table, main_fields, sec_table_name, second_fields
1385 ));
1386 self
1387 }
1388}