1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::pools;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use std::sync::Mutex;
10use std::sync::Arc;
11use std::thread;
12use rand::{rng, Rng};
13use br_pgsql::connect::Connect;
14use br_pgsql::pools::Pools;
15
16lazy_static! {
17 static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
18 static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
19}
20#[derive(Clone)]
21pub struct Pgsql {
22 pub connection: Connection,
24 pub default: String,
26 pub params: Params,
27 pub client: Pools,
28}
29
30impl Pgsql {
31 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
32 let port = connection.hostport.parse::<i32>().map_err(|e| format!("parse hostport to i32 err: {:?}", e))?;
33
34 let cp_connection = connection.clone();
35 let config = object! {
36 debug: cp_connection.debug,
37 username: cp_connection.username,
38 userpass: cp_connection.userpass,
39 database: cp_connection.database,
40 hostname: cp_connection.hostname,
41 hostport: port,
42 charset: cp_connection.charset.str(),
43 };
44 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
45
46 let pools = pgsql.pools()?;
47 Ok(Self {
48 connection,
49 default: default.clone(),
50 params: Params::default("pgsql"),
51 client: pools,
52 })
53 }
54
55 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
56 let thread_id = format!("{:?}", thread::current().id());
57 let _key = format!("{}{}", self.default, thread_id);
58 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
59 let key = format!("{}{}", self.default, thread_id);
60 let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
61 let mut t = db.lock().unwrap();
62 match t.query(sql) {
63 Ok(e) => {
64 if self.connection.debug {
65 info!("查询成功: {} {}", thread_id.clone(), sql);
66 }
67 (true, e.rows)
68 }
69 Err(e) => {
70 error!("事务查询失败: 线程ID: {} 错误: {} SQL语句: [{}]",thread_id,e,sql);
71 (false, JsonValue::from(e.to_string()))
72 }
73 }
74 } else {
75 let mut db = match self.client.get_connect() {
76 Ok(c) => c,
77 Err(e) => {
78 error!("非事务查询失败: get_connect 线程ID: {} 错误: {} SQL语句: [{}]",thread_id,e,sql);
79 return (false, JsonValue::from(e.to_string()));
80 }
81 };
82 match db.query(sql) {
83 Ok(e) => {
84 if self.connection.debug {
85 info!("查询成功: {} {}", thread_id.clone(), sql);
86 }
87 (true, e.rows)
88 }
89 Err(e) => {
90 error!("非事务查询失败: 线程ID: {} 错误: {} SQL语句: [{}]",thread_id,e,sql);
91 (false, JsonValue::from(e.to_string()))
92 }
93 }
94 }
95 }
96 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
97 let thread_id = format!("{:?}", thread::current().id());
98
99 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
100 let key = format!("{}{}", self.default, thread_id);
101 let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
102 let mut t = db.lock().unwrap();
103 match t.execute(sql) {
104 Ok(e) => {
105 if self.connection.debug {
106 info!("提交成功: {} {}", thread_id.clone(), sql);
107 }
108 if sql.contains("INSERT") {
109 (true, e.rows)
110 } else {
111 (true, e.affect_count.into())
112 }
113 }
114 Err(e) => {
115 error!("事务提交失败: {} {} {}", thread_id, e, sql);
116 (false, JsonValue::from(e.to_string()))
117 }
118 }
119 } else {
120 let mut db = self.client.get_connect().unwrap();
121 match db.execute(sql) {
122 Ok(e) => {
123 if self.connection.debug {
124 info!("提交成功: {} {}", thread_id.clone(), sql);
125 }
126 if sql.contains("INSERT") {
127 (true, e.rows)
128 } else {
129 (true, e.affect_count.into())
130 }
131 }
132 Err(e) => {
133 error!("非事务提交失败: {} {} {}", thread_id, e, sql);
134 (false, JsonValue::from(e.to_string()))
135 }
136 }
137 }
138 }
139}
140
141impl DbMode for Pgsql {
142 fn database_tables(&mut self) -> JsonValue {
143 let sql = "SHOW TABLES".to_string();
144 match self.sql(sql.as_str()) {
145 Ok(e) => {
146 let mut list = vec![];
147 for item in e.members() {
148 for (_, value) in item.entries() {
149 list.push(value.clone());
150 }
151 }
152 list.into()
153 }
154 Err(_) => {
155 array![]
156 }
157 }
158 }
159
160 fn database_create(&mut self, name: &str) -> bool {
161 let sql = format!("CREATE DATABASE {}", name);
162
163 let (state, data) = self.execute(sql.as_str());
164 match state {
165 true => data.as_bool().unwrap(),
166 false => {
167 error!("创建数据库失败: {:?}", data);
168 false
169 }
170 }
171 }
172}
173
174impl Mode for Pgsql {
175 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
176 let mut sql = String::new();
177 let mut comments = vec![];
178
179 if !options.table_unique.is_empty() {
181 let unique = format!(
182 "CREATE UNIQUE INDEX {01}_unique_{} ON {} ({});",
183 options.table_unique.join("_"),
184 options.table_name,
185 options.table_unique.join(",")
186 );
187 comments.push(unique);
188 }
189
190 for row in options.table_index.iter() {
192 let index = format!(
193 "CREATE INDEX {01}_index_{} ON {} ({})",
194 row.join("_"),
195 options.table_name,
196 row.join(",")
197 );
198 comments.push(index);
199 }
200
201 for (name, field) in options.table_fields.entries_mut() {
202 field["table_name"] = options.table_name.clone().into();
203 let row = br_fields::field("pgsql", name, field.clone());
204 let rows = row.split("comment").collect::<Vec<&str>>();
205 comments.push(format!(
206 "COMMENT ON COLUMN {}.{} IS {};",
207 options.table_name, name, rows[1]
208 ));
209 sql = format!("{} {},\r\n", sql, rows[0]);
210 }
211
212 let primary_key = format!(
213 "CONSTRAINT {}_{} PRIMARY KEY ({})",
214 options.table_name, options.table_key, options.table_key
215 );
216 let sql = format!(
217 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
218 options.table_name,
219 sql.trim_end_matches(",\r\n"),
220 primary_key
221 );
222 comments.insert(0, sql);
223
224 for (_name, field) in options.table_fields.entries() {
225 field["mode"].as_str().unwrap();
226 {}
227 }
228
229 if self.params.sql {
230 let info = comments.join("\r\n");
231 return JsonValue::from(info);
232 }
233 for comment in comments {
234 let (state, _) = self.execute(comment.as_str());
235 match state {
236 true => {}
237 false => {
238 return JsonValue::from(state);
239 }
240 }
241 }
242 JsonValue::from(true)
243 }
244
245 fn table_update(&mut self, options: TableOptions) -> JsonValue {
246 let fields_list = self.table_info(&options.table_name);
247 let mut put = vec![];
248 let mut add = vec![];
249 let mut del = vec![];
250 let mut comments = vec![];
251
252 for (key, _) in fields_list.entries() {
253 if options.table_fields[key].is_empty() {
254 del.push(key);
255 }
256 }
257 for (name, field) in options.table_fields.entries() {
258 if !fields_list[name].is_empty() {
259 let old_comment = fields_list[name]["comment"].to_string();
260 let new_comment = br_fields::field("pgsql", name, field.clone());
261 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
262 let new_comment_text = new_comment[1].trim().trim_start_matches("'").trim_end_matches("'");
263 if old_comment == new_comment_text {
264 continue;
265 }
266 put.push(name);
267 } else {
268 add.push(name);
269 }
270 }
271
272 for name in add.iter() {
273 let name = name.to_string();
274 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
275 let rows = row.split("comment").collect::<Vec<&str>>();
276 comments.push(format!(
277 r#"ALTER TABLE "{}" add {};"#,
278 options.table_name, rows[0]
279 ));
280 comments.push(format!(
281 "COMMENT ON COLUMN {}.{} IS {};",
282 options.table_name, name, rows[1]
283 ));
284 }
285 for name in del.iter() {
286 comments.push(format!(
287 "ALTER TABLE {} DROP {};\r\n",
288 options.table_name, name
289 ));
290 }
291 for name in put.iter() {
292 let name = name.to_string();
293 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
294 let rows = row.split("comment").collect::<Vec<&str>>();
295
296 let sql = rows[0].split(" ").collect::<Vec<&str>>();
297
298 if sql[1].contains("BOOLEAN") {
299 let text = format!(
300 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
301 options.table_name, name
302 );
303 comments.push(text.clone());
304 let text = format!(
305 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
306 options.table_name, name, sql[1]
307 );
308 comments.push(text.clone());
309 } else {
310 let text = format!(
311 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
312 options.table_name, name, sql[1]
313 );
314 comments.push(text.clone());
315 };
316
317 if sql.len() > 3 && !sql[3].is_empty() {
318 let text = format!(
319 "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT '{}';\r\n",
320 options.table_name, name, sql[3]
321 );
322 comments.push(text.clone());
323 }
324
325 comments.push(format!(
326 "COMMENT ON COLUMN {}.{} IS {};",
327 options.table_name, name, rows[1]
328 ));
329 }
330
331 let mut unique_new = vec![];
332 let mut index_new = vec![];
333 let mut primary_key = vec![];
334 let (_, index_list) = self.query(
335 format!(
336 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
337 options.table_name
338 ).as_str(),
339 );
340 for item in index_list.members() {
341 let key_name = item["indexname"].as_str().unwrap();
342 let indexdef = item["indexdef"].to_string();
343
344 if indexdef.contains(
345 format!(
346 "CREATE UNIQUE INDEX {}_{} ON",
347 options.table_name, options.table_key
348 ).as_str(),
349 ) {
350 primary_key.push(key_name.to_string());
351 continue;
352 }
353 if indexdef.contains("CREATE UNIQUE INDEX") {
354 unique_new.push(key_name.to_string());
355 continue;
356 }
357 if indexdef.contains("CREATE INDEX") {
358 index_new.push(key_name.to_string());
359 continue;
360 }
361 }
362
363 if !options.table_unique.is_empty() {
364 let name = format!(
365 "{}_unique_{}",
366 options.table_name,
367 options.table_unique.join("_")
368 );
369 let unique = format!(
370 "CREATE UNIQUE INDEX {} ON {} ({});",
371 name,
372 options.table_name,
373 options.table_unique.join(",")
374 );
375 if !unique_new.contains(&name) {
376 comments.push(unique);
377 } else {
378 unique_new.retain(|x| *x != name);
379 }
380 }
381
382 for row in options.table_index.iter() {
384 let name = format!("{}_index_{}", options.table_name, row.join("_"));
385 let index = format!(
386 "CREATE INDEX {} ON {} ({})",
387 name,
388 options.table_name,
389 row.join(",")
390 );
391 if !index_new.contains(&name) {
392 comments.push(index);
393 } else {
394 index_new.retain(|x| *x != name);
395 }
396 }
397
398 for item in unique_new {
399 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
400 }
401 for item in index_new {
402 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
403 }
404
405 if self.params.sql {
406 return JsonValue::from(comments.join(""));
407 }
408
409 if comments.is_empty() {
410 return JsonValue::from(-1);
411 }
412
413 for item in comments.iter() {
414 let (state, res) = self.execute(item.as_str());
415 match state {
416 true => {}
417 false => {
418 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
419 return JsonValue::from(0);
420 }
421 }
422 }
423 JsonValue::from(1)
424 }
425
426 fn table_info(&mut self, table: &str) -> JsonValue {
427 let sql = format!(
428 "SELECT COL.COLUMN_NAME,
429 COL.DATA_TYPE,
430 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
431 LEFT JOIN
432 pg_catalog.pg_description DESCRIPTION
433 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
434 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{}'",
435 table
436 );
437 let (state, data) = self.query(sql.as_str());
438 let mut list = object! {};
439 if state {
440 for item in data.members() {
441 let mut row = object! {};
442 row["field"] = item["column_name"].clone();
443 row["comment"] = item["comment"].clone();
444 row["type"] = item["data_type"].clone();
445 list[row["field"].as_str().unwrap()] = row.clone();
446 }
447 list
448 } else {
449 list
450 }
451 }
452
453 fn table_is_exist(&mut self, name: &str) -> bool {
454 let sql = format!(
455 "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{}')",
456 name
457 );
458 let (state, data) = self.query(sql.as_str());
459 match state {
460 true => {
461 for item in data.members() {
462 if item.has_key("exists") {
463 return item["exists"].as_bool().unwrap();
464 }
465 }
466 false
467 }
468 false => false,
469 }
470 }
471
472 fn table(&mut self, name: &str) -> &mut Pgsql {
473 self.params = Params::default(self.connection.mode.str().as_str());
474 self.params.table = format!("{}{}", self.connection.prefix, name);
475 self.params.join_table = self.params.table.clone();
476 self
477 }
478
479 fn change_table(&mut self, name: &str) -> &mut Self {
480 self.params.join_table = name.to_string();
481 self
482 }
483
484 fn autoinc(&mut self) -> &mut Self {
485 self.params.autoinc = true;
486 self
487 }
488
489 fn fetch_sql(&mut self) -> &mut Self {
490 self.params.sql = true;
491 self
492 }
493
494 fn order(&mut self, field: &str, by: bool) -> &mut Self {
495 self.params.order[field] = {
496 if by {
497 "DESC"
498 } else {
499 "ASC"
500 }
501 }.into();
502 self
503 }
504
505 fn group(&mut self, field: &str) -> &mut Self {
506 let fields: Vec<&str> = field.split(",").collect();
507 for field in fields.iter() {
508 let field = field.to_string();
509 self.params.group[field.as_str()] = field.clone().into();
510 self.params.fields[field.as_str()] = field.clone().into();
511 }
512 self
513 }
514
515 fn distinct(&mut self) -> &mut Self {
516 self.params.distinct = true;
517 self
518 }
519
520 fn json(&mut self, field: &str) -> &mut Self {
521 let list: Vec<&str> = field.split(",").collect();
522 for item in list.iter() {
523 self.params.json[item.to_string().as_str()] = item.to_string().into();
524 }
525 self
526 }
527
528 fn field(&mut self, field: &str) -> &mut Self {
529 let list: Vec<&str> = field.split(",").collect();
530 let join_table = if self.params.join_table.is_empty() {
531 self.params.table.clone()
532 } else {
533 self.params.join_table.clone()
534 };
535 for item in list.iter() {
536 if item.contains(" as ") {
537 let text = item.split(" as ").collect::<Vec<&str>>();
538 if text[0].contains("count(") {
539 self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
540 } else {
541 self.params.fields[item.to_string().as_str()] = format!("{}.{} as {}", join_table, text[0], text[1]).into();
542 }
543 } else {
544 self.params.fields[item.to_string().as_str()] = format!("{}.{}", join_table, item).into();
545 }
546 }
547 self
548 }
549
550 fn hidden(&mut self, name: &str) -> &mut Self {
551 let hidden: Vec<&str> = name.split(",").collect();
552
553 let fields_list = self.table_info(self.params.clone().table.as_str());
554 let mut data = array![];
555 for item in fields_list.members() {
556 data.push(object! {
557 "name":item["field"].as_str().unwrap()
558 }).unwrap();
559 }
560
561 for item in data.members() {
562 let name = item["name"].as_str().unwrap();
563 if !hidden.contains(&name) {
564 self.params.fields[name] = name.into();
565 }
566 }
567 self
568 }
569
570 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
571 let join_table = if self.params.join_table.is_empty() {
572 self.params.table.clone()
573 } else {
574 self.params.join_table.clone()
575 };
576 if value.is_boolean() {
577 if value.as_bool().unwrap() {
578 value = 1.into();
579 } else {
580 value = 0.into();
581 }
582 }
583 match compare {
584 "between" => {
585 self.params.where_and.push(format!(
586 "{}.{} between '{}' AND '{}'",
587 join_table, field, value[0], value[1]
588 ));
589 }
590 "set" => {
591 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
592 let mut wheredata = vec![];
593 for item in list.iter() {
594 wheredata.push(format!("'{}' = ANY (string_to_array({}.{},','))", item, join_table, field));
595 }
596 self.params.where_and.push(format!("({})", wheredata.join(" or ")));
597 }
598 "notin" => {
599 let mut text = String::new();
600 for item in value.members() {
601 text = format!("{},'{}'", text, item);
602 }
603 text = text.trim_start_matches(",").into();
604 self.params.where_and.push(format!("{}.{} not in ({})", join_table, field, text));
605 }
606 "is" => {
607 self.params.where_and.push(format!("{}.{} is {}", join_table, field, value));
608 }
609 "notlike" => {
610 self.params.where_and.push(format!("{}.{} not like '{}'", join_table, field, value));
611 }
612 "in" => {
613 let mut text = String::new();
614 if value.is_array() {
615 for item in value.members() {
616 text = format!("{},'{}'", text, item);
617 }
618 } else if value.is_null() {
619 text = format!("{},null", text);
620 } else {
621 let value = value.as_str().unwrap();
622
623 let value: Vec<&str> = value.split(",").collect();
624 for item in value.iter() {
625 text = format!("{},'{}'", text, item);
626 }
627 }
628 text = text.trim_start_matches(",").into();
629
630 self.params.where_and.push(format!("{}.{} {} ({})", join_table, field, compare, text));
631 }
632 _ => {
633 self.params.where_and.push(format!("{}.{} {} '{}'", join_table, field, compare, value));
634 }
635 }
636 self
637 }
638
639 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
640 let join_table = if self.params.join_table.is_empty() {
641 self.params.table.clone()
642 } else {
643 self.params.join_table.clone()
644 };
645
646 if value.is_boolean() {
647 if value.as_bool().unwrap() {
648 value = 1.into();
649 } else {
650 value = 0.into();
651 }
652 }
653
654 match compare {
655 "between" => {
656 self.params.where_or.push(format!(
657 "{}.{} between '{}' AND '{}'",
658 join_table, field, value[0], value[1]
659 ));
660 }
661 "set" => {
662 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
663 let mut wheredata = vec![];
664 for item in list.iter() {
665 wheredata.push(format!("'{}' = ANY (string_to_array({}.{},','))", item, join_table, field));
666 }
667 self.params.where_or.push(format!("({})", wheredata.join(" or ")));
668 }
669 "notin" => {
670 let mut text = String::new();
671 for item in value.members() {
672 text = format!("{},'{}'", text, item);
673 }
674 text = text.trim_start_matches(",").into();
675 self.params.where_or.push(format!("{}.{} not in ({})", join_table, field, text));
676 }
677 "in" => {
678 let mut text = String::new();
679 if value.is_array() {
680 for item in value.members() {
681 text = format!("{},'{}'", text, item);
682 }
683 } else {
684 let value = value.as_str().unwrap();
685 let value: Vec<&str> = value.split(",").collect();
686 for item in value.iter() {
687 text = format!("{},'{}'", text, item);
688 }
689 }
690 text = text.trim_start_matches(",").into();
691 self.params.where_or.push(format!("{}.{} {} ({})", join_table, field, compare, text));
692 }
693 _ => {
694 self.params.where_or.push(format!("{}.{} {} '{}'", join_table, field, compare, value));
695 }
696 }
697 self
698 }
699
700 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
701 self.params.where_column = format!(
702 "{}.{} {} {}.{}",
703 self.params.table, field_a, compare, self.params.table, field_b
704 );
705 self
706 }
707
708 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
709 self.params.page = page;
710 self.params.limit = limit;
711 self
712 }
713
714 fn column(&mut self, field: &str) -> JsonValue {
715 self.field(field);
716 let sql = self.params.select_sql();
717
718 if self.params.sql {
719 return JsonValue::from(sql);
720 }
721 let (state, data) = self.query(sql.as_str());
722 match state {
723 true => {
724 let mut list = array![];
725 for item in data.members() {
726 if self.params.json[field].is_empty() {
727 list.push(item[field].clone()).unwrap();
728 } else {
729 let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
730 list.push(data).unwrap();
731 }
732 }
733 list
734 }
735 false => {
736 array![]
737 }
738 }
739 }
740
741 fn count(&mut self) -> JsonValue {
742 self.params.fields["count"] = "count(*) as count".to_string().into();
743 let sql = self.params.select_sql();
744 if self.params.sql {
745 return JsonValue::from(sql.clone());
746 }
747 let (state, data) = self.query(sql.as_str());
748 if state {
749 data[0]["count"].clone()
750 } else {
751 JsonValue::from(0)
752 }
753 }
754
755 fn max(&mut self, field: &str) -> JsonValue {
756 self.params.fields[field] = format!("max({00}) as {00}", field).into();
757 let sql = self.params.select_sql();
758 if self.params.sql {
759 return JsonValue::from(sql.clone());
760 }
761 let (state, data) = self.query(sql.as_str());
762 if state {
763 if data.len() > 1 {
764 return data.clone();
765 }
766 data[0][field].clone()
767 } else {
768 JsonValue::from(0)
769 }
770 }
771
772 fn min(&mut self, field: &str) -> JsonValue {
773 self.params.fields[field] = format!("min({00}) as {00}", field).into();
774 let sql = self.params.select_sql();
775 if self.params.sql {
776 return JsonValue::from(sql.clone());
777 }
778 let (state, data) = self.query(sql.as_str());
779 if state {
780 if data.len() > 1 {
781 return data;
782 }
783 data[0][field].clone()
784 } else {
785 JsonValue::from(0)
786 }
787 }
788
789 fn sum(&mut self, field: &str) -> JsonValue {
790 self.params.fields[field] = format!("sum({00}) as {00}", field).into();
791 let sql = self.params.select_sql();
792 if self.params.sql {
793 return JsonValue::from(sql.clone());
794 }
795 let (state, data) = self.query(sql.as_str());
796 match state {
797 true => {
798 if data.len() > 1 {
799 return data;
800 }
801 data[0][field].clone()
802 }
803 false => JsonValue::from(0),
804 }
805 }
806
807 fn avg(&mut self, field: &str) -> JsonValue {
808 self.params.fields[field] = format!("avg({00}) as {00}", field).into();
809 let sql = self.params.select_sql();
810 if self.params.sql {
811 return JsonValue::from(sql.clone());
812 }
813 let (state, data) = self.query(sql.as_str());
814 if state {
815 if data.len() > 1 {
816 return data;
817 }
818 data[0][field].clone()
819 } else {
820 JsonValue::from(0)
821 }
822 }
823
824 fn select(&mut self) -> JsonValue {
825 let sql = self.params.select_sql();
826 if self.params.sql {
827 return JsonValue::from(sql.clone());
828 }
829 let (state, mut data) = self.query(sql.as_str());
830 match state {
831 true => {
832 for (field, _) in self.params.json.entries() {
833 for item in data.members_mut() {
834 if !item[field].is_empty() {
835 let json = item[field].to_string();
836 item[field] = match json::parse(&json) {
837 Ok(e) => e,
838 Err(_) => JsonValue::from(json),
839 };
840 }
841 }
842 }
843 data.clone()
844 }
845 false => array![],
846 }
847 }
848
849 fn find(&mut self) -> JsonValue {
850 self.params.page = 1;
851 self.params.limit = 1;
852 let sql = self.params.select_sql();
853 if self.params.sql {
854 return JsonValue::from(sql.clone());
855 }
856 let (state, mut data) = self.query(sql.as_str());
857 match state {
858 true => {
859 if data.is_empty() {
860 return object! {};
861 }
862 for (field, _) in self.params.json.entries() {
863 if !data[0][field].is_empty() {
864 let json = data[0][field].to_string();
865 let json = json::parse(&json).unwrap_or(array![]);
866 data[0][field] = json;
867 } else {
868 data[0][field] = array![];
869 }
870 }
871 data[0].clone()
872 }
873 false => {
874 object! {}
875 }
876 }
877 }
878
879 fn value(&mut self, field: &str) -> JsonValue {
880 self.params.fields = object! {};
881 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
882 self.params.page = 1;
883 self.params.limit = 1;
884 let sql = self.params.select_sql();
885 if self.params.sql {
886 return JsonValue::from(sql.clone());
887 }
888 let (state, mut data) = self.query(sql.as_str());
889 match state {
890 true => {
891 for (field, _) in self.params.json.entries() {
892 if !data[0][field].is_empty() {
893 let json = data[0][field].to_string();
894 let json = json::parse(&json).unwrap_or(array![]);
895 data[0][field] = json;
896 } else {
897 data[0][field] = array![];
898 }
899 }
900 data[0][field].clone()
901 }
902 false => {
903 if self.connection.debug {
904 info!("{:?}", data);
905 }
906 JsonValue::Null
907 }
908 }
909 }
910
911 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
912 let mut fields = vec![];
913 let mut values = vec![];
914 if !self.params.autoinc && data["id"].is_empty() {
915 data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
916 }
917 for (field, value) in data.entries() {
918 fields.push(field);
919
920 if value.is_string() {
921 values.push(format!("'{}'", value.to_string().replace("'", "''")));
922 continue;
923 } else if value.is_array() {
924 if self.params.json[field].is_empty() {
925 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
926 values.push(format!("'{}'", array));
927 } else {
928 let json = value.to_string();
929 let json = json.replace("'", "''");
930 values.push(format!("'{}'", json));
931 }
932 continue;
933 } else if value.is_object() {
934 if self.params.json[field].is_empty() {
935 values.push(format!("'{}'", value));
936 } else {
937 let json = value.to_string();
938 let json = json.replace("'", "''");
939 values.push(format!("'{}'", json));
940 }
941 continue;
942 } else if value.is_number() || value.is_boolean() || value.is_null() {
943 values.push(format!("{}", value));
944 continue;
945 } else {
946 values.push(format!("'{}'", value));
947 continue;
948 }
949 }
950 let fields = fields.join(",");
951 let values = values.join(",");
952
953 let sql = format!(
954 "INSERT INTO {} ({}) VALUES ({});",
955 self.params.table, fields, values
956 );
957 if self.params.sql {
958 return JsonValue::from(sql.clone());
959 }
960 let (state, ids) = self.execute(sql.as_str());
961
962 match state {
963 true => match self.params.autoinc {
964 true => ids.clone(),
965 false => data["id"].clone(),
966 },
967 false => {
968 let thread_id = format!("{:?}", thread::current().id());
969 error!("添加失败: {} {:?} {}", thread_id, ids, sql);
970 JsonValue::from("")
971 }
972 }
973 }
974 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
975 let mut fields = String::new();
976 if !self.params.autoinc && data[0]["id"].is_empty() {
977 data[0]["id"] = "".into();
978 }
979 for (field, _) in data[0].entries() {
980 fields = format!("{},{}", fields, field);
981 }
982 fields = fields.trim_start_matches(",").parse().unwrap();
983
984 let core_count = num_cpus::get();
985 let mut p = pools::Pool::new(core_count * 4);
986
987 let mut rng = rng();
988 let i: i32 = rng.random_range(1..=100);
989 let autoinc = self.params.autoinc;
990 for list in data.members() {
991 let mut item = list.clone();
992 p.execute(move |pcindex| {
993 if !autoinc && item["id"].is_empty() {
994 let id = format!(
995 "{:X}{:X}{}",
996 Local::now().timestamp_nanos_opt().unwrap(),
997 pcindex,
998 i
999 );
1000 item["id"] = id.into();
1001 }
1002 let mut values = "".to_string();
1003 for (_, value) in item.entries() {
1004 if value.is_string() {
1005 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1006 } else if value.is_number() {
1007 values = format!("{},{}", values, value);
1008 } else if value.is_boolean() {
1009 values = format!("{},{}", values, value);
1010 continue;
1011 } else {
1012 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1013 }
1014 }
1015 values = format!("({})", values.trim_start_matches(","));
1016 array![item["id"].clone(), values]
1017 });
1018 }
1019 let (ids_list, mut values) = p.insert_all();
1020 values = values.trim_start_matches(",").parse().unwrap();
1021 let sql = format!(
1022 "INSERT INTO {} ({}) VALUES {};",
1023 self.params.table, fields, values
1024 );
1025
1026 if self.params.sql {
1027 return JsonValue::from(sql.clone());
1028 }
1029 let (state, data) = self.execute(sql.as_str());
1030 match state {
1031 true => match autoinc {
1032 true => data,
1033 false => JsonValue::from(ids_list),
1034 },
1035 false => {
1036 error!("insert_all: {:?}", data);
1037 array![]
1038 }
1039 }
1040 }
1041 fn update(&mut self, data: JsonValue) -> JsonValue {
1042 let mut values = vec![];
1043 for (field, value) in data.entries() {
1044 if value.is_string() {
1045 values.push(format!(
1046 "{}='{}'",
1047 field,
1048 value.to_string().replace("'", "''")
1049 ));
1050 } else if value.is_number() {
1051 values.push(format!("{}= {}", field, value));
1052 } else if value.is_array() {
1053 if self.params.json[field].is_empty() {
1054 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1055 values.push(format!("{}='{}'", field, array));
1056 } else {
1057 let json = value.to_string();
1058 let json = json.replace("'", "''");
1059 values.push(format!("{}='{}'", field, json));
1060 }
1061 continue;
1062 } else if value.is_object() {
1063 if self.params.json[field].is_empty() {
1064 values.push(format!("{}='{}'", field, value));
1065 } else {
1066 if value.is_empty() {
1067 values.push(format!("{}=''", field));
1068 continue;
1069 }
1070 let json = value.to_string();
1071 let json = json.replace("'", "''");
1072 values.push(format!("{}='{}'", field, json));
1073 }
1074 continue;
1075 } else if value.is_boolean() {
1076 values.push(format!("{}= {}", field, value));
1077 } else {
1078 values.push(format!("{}=\"{}\"", field, value));
1079 }
1080 }
1081
1082 for (field, value) in self.params.inc_dec.entries() {
1083 values.push(format!("{} = {}", field, value.to_string().clone()));
1084 }
1085
1086 let values = values.join(",");
1087
1088 let sql = format!(
1089 "UPDATE {} SET {} {};",
1090 self.params.table.clone(),
1091 values,
1092 self.params.where_sql()
1093 );
1094 if self.params.sql {
1095 return JsonValue::from(sql.clone());
1096 }
1097 let (state, data) = self.execute(sql.as_str());
1098 if state {
1099 data
1100 } else {
1101 let thread_id = format!("{:?}", thread::current().id());
1102 error!("update: {} {:?} {}", thread_id, data, sql);
1103 0.into()
1104 }
1105 }
1106 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1107 let mut values = vec![];
1108
1109 let mut ids = vec![];
1110 for (field, _) in data[0].entries() {
1111 if field == "id" {
1112 continue;
1113 }
1114 let mut fields = vec![];
1115 for row in data.members() {
1116 let value = row[field].clone();
1117 let id = row["id"].clone();
1118 ids.push(id.clone());
1119 if value.is_string() {
1120 fields.push(format!(
1121 "WHEN '{}' THEN '{}'",
1122 id,
1123 value.to_string().replace("'", "''")
1124 ));
1125 } else if value.is_array() || value.is_object() {
1126 if self.params.json[field].is_empty() {
1127 fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1128 } else {
1129 let json = value.to_string();
1130 let json = json.replace("'", "''");
1131 fields.push(format!("WHEN '{}' THEN '{}'", id, json));
1132 }
1133 continue;
1134 } else if value.is_number() || value.is_boolean() || value.is_null() {
1135 fields.push(format!("WHEN '{}' THEN {}", id, value));
1136 } else {
1137 fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1138 }
1139 }
1140 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1141 }
1142 self.where_and("id", "in", ids.into());
1143 for (field, value) in self.params.inc_dec.entries() {
1144 values.push(format!("{} = {}", field, value.to_string().clone()));
1145 }
1146
1147 let values = values.join(",");
1148 let sql = format!(
1149 "UPDATE {} SET {} {} {};",
1150 self.params.table.clone(),
1151 values,
1152 self.params.where_sql(),
1153 self.params.page_limit_sql()
1154 );
1155 if self.params.sql {
1156 return JsonValue::from(sql.clone());
1157 }
1158 let (state, data) = self.execute(sql.as_str());
1159 if state {
1160 data
1161 } else {
1162 error!("update_all: {:?}", data);
1163 JsonValue::from(0)
1164 }
1165 }
1166
1167 fn delete(&mut self) -> JsonValue {
1168 let sql = format!(
1169 "delete FROM {} {} {};",
1170 self.params.table.clone(),
1171 self.params.where_sql(),
1172 self.params.page_limit_sql()
1173 );
1174 if self.params.sql {
1175 return JsonValue::from(sql.clone());
1176 }
1177 let (state, data) = self.execute(sql.as_str());
1178 match state {
1179 true => data,
1180 false => {
1181 error!("delete 失败>>> {:?}", data);
1182 JsonValue::from(0)
1183 }
1184 }
1185 }
1186
1187 fn transaction(&mut self) -> bool {
1188 let thread_id = format!("{:?}", thread::current().id());
1189
1190 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1191 let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1192 t += 1;
1193 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1194 return true;
1195 }
1196
1197 TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1198 let key = format!("{}{}", self.default, thread_id);
1199 TR.lock().unwrap().insert(key.clone(), Arc::new(Mutex::new(self.client.get_connect().unwrap())));
1200
1201 let sql = "START TRANSACTION;".to_string();
1202 let (state, _) = self.execute(sql.as_str());
1203 match state {
1204 true => {
1205 let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1206 let (state, _) = self.execute(sql.as_str());
1207 match state {
1208 true => state,
1209 false => {
1210 TRANS.lock().unwrap().remove(&*thread_id.clone());
1211 TR.lock().unwrap().remove(&key.clone());
1212 state
1213 }
1214 }
1215 }
1216 false => {
1217 TRANS.lock().unwrap().remove(&*thread_id.clone());
1218 TR.lock().unwrap().remove(&key.clone());
1219 state
1220 }
1221 }
1222 }
1223 fn commit(&mut self) -> bool {
1224 let thread_id = format!("{:?}", thread::current().id());
1225 let sql = "COMMIT".to_string();
1226
1227 let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1228 if t > 1 {
1229 t -= 1;
1230 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1231 return true;
1232 }
1233 let (state, data) = self.execute(sql.as_str());
1234 TRANS.lock().unwrap().remove(&thread_id);
1235 let key = format!("{}{}", self.default, thread_id);
1236 TR.lock().unwrap().remove(&*key);
1237 match state {
1238 true => {}
1239 false => {
1240 error!("提交事务失败: {}", data);
1241 }
1242 }
1243 state
1244 }
1245
1246 fn rollback(&mut self) -> bool {
1247 let thread_id = format!("{:?}", thread::current().id());
1248 let sql = "ROLLBACK".to_string();
1249
1250 let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1251 if t > 1 {
1252 t -= 1;
1253 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1254 return true;
1255 }
1256 let (state, data) = self.execute(sql.as_str());
1257 TRANS.lock().unwrap().remove(&thread_id);
1258 let key = format!("{}{}", self.default, thread_id);
1259 TR.lock().unwrap().remove(&*key);
1260 match state {
1261 true => {}
1262 false => {
1263 error!("回滚失败: {}", data);
1264 }
1265 }
1266 state
1267 }
1268
1269 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1270 let (state, data) = self.query(sql);
1271 match state {
1272 true => Ok(data),
1273 false => Err(data.to_string()),
1274 }
1275 }
1276
1277 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1278 let (state, data) = self.execute(sql);
1279 match state {
1280 true => Ok(data),
1281 false => Err(data.to_string()),
1282 }
1283 }
1284
1285 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1286 self.params.inc_dec[field] = format!("{} + {}", field, num).into();
1287 self
1288 }
1289
1290 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1291 self.params.inc_dec[field] = format!("{} - {}", field, num).into();
1292 self
1293 }
1294 fn buildsql(&mut self) -> String {
1295 self.fetch_sql();
1296 let sql = self.select().to_string();
1297 format!("( {} ) {}", sql, self.params.table)
1298 }
1299
1300 fn join(&mut self, table: &str, main_fields: &str, right_fields: &str) -> &mut Self {
1301 let main_fields = if main_fields.is_empty() {
1302 "id"
1303 } else {
1304 main_fields
1305 };
1306 let right_fields = if right_fields.is_empty() {
1307 self.params.table.clone()
1308 } else {
1309 right_fields.to_string().clone()
1310 };
1311 self.params.join_table = table.to_string();
1312 self.params.join.push(format!(
1313 " LEFT JOIN {} ON {}.{} = {}.{}",
1314 table, self.params.table, main_fields, table, right_fields
1315 ));
1316 self
1317 }
1318
1319 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1320 let main_fields = if main_fields.is_empty() {
1321 "id"
1322 } else {
1323 main_fields
1324 };
1325 let second_fields = if second_fields.is_empty() {
1326 self.params.table.clone()
1327 } else {
1328 second_fields.to_string().clone()
1329 };
1330 let sec_table_name = format!("{}{}", table, "_2");
1331 let second_table = format!("{} {}", table, sec_table_name.clone());
1332 self.params.join_table = sec_table_name.clone();
1333 self.params.join.push(format!(
1334 " INNER JOIN {} ON {}.{} = {}.{}",
1335 second_table, self.params.table, main_fields, sec_table_name, second_fields
1336 ));
1337 self
1338 }
1339}