1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::pools;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use std::sync::Mutex;
10use std::sync::Arc;
11use std::thread;
12use rand::{rng, Rng};
13use br_pgsql::connect::Connect;
14use br_pgsql::pools::Pools;
15
16lazy_static! {
17 static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
18 static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
19}
20#[derive(Clone)]
21pub struct Pgsql {
22 pub connection: Connection,
24 pub default: String,
26 pub params: Params,
27 pub client: Pools,
28}
29
30impl Pgsql {
31 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
32 let port = connection.hostport.parse::<i32>().map_err(|e| format!("parse hostport to i32 err: {:?}", e))?;
33
34 let cp_connection = connection.clone();
35 let config = object! {
36 debug: cp_connection.debug,
37 username: cp_connection.username,
38 userpass: cp_connection.userpass,
39 database: cp_connection.database,
40 hostname: cp_connection.hostname,
41 hostport: port,
42 charset: cp_connection.charset.str(),
43 };
44 let mut pgsql = br_pgsql::Pgsql::new(config)?;
45
46 let pools = pgsql.pools()?;
47 Ok(Self {
48 connection,
49 default: default.clone(),
50 params: Params::default("pgsql"),
51 client: pools,
52 })
53 }
54
55 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
100 let thread_id = format!("{:?}", thread::current().id());
101 let _key = format!("{}{}", self.default, thread_id);
102
103 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
104 let key = format!("{}{}", self.default, thread_id);
105 let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
106 let mut t = db.lock().unwrap();
107 match t.query(sql) {
108 Ok(e) => {
109 if self.connection.debug {
110 info!("查询成功: {} {}", thread_id.clone(), sql);
111 }
112 (true, e.rows)
113 }
114 Err(e) => {
115 error!("事务查询失败: 线程ID: {} 错误: {} SQL语句: [{}]",thread_id,e,sql);
116 (false, JsonValue::from(e.to_string()))
117 }
118 }
119 } else {
120 let mut db = match self.client.get_connect() {
121 Ok(c) => c,
122 Err(e) => {
123 error!("非事务查询失败: get_connect 线程ID: {} 错误: {} SQL语句: [{}]",thread_id,e,sql);
124 return (false, JsonValue::from(e.to_string()));
125 }
126 };
127 match db.query(sql) {
128 Ok(e) => {
129 if self.connection.debug {
130 info!("查询成功: {} {}", thread_id.clone(), sql);
131 }
132 (true, e.rows)
133 }
134 Err(e) => {
135 error!("非事务查询失败: 线程ID: {} 错误: {} SQL语句: [{}]",thread_id,e,sql);
136 (false, JsonValue::from(e.to_string()))
137 }
138 }
139 }
140 }
141 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
142 let thread_id = format!("{:?}", thread::current().id());
143
144 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
145 let key = format!("{}{}", self.default, thread_id);
146 let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
147 let mut t = db.lock().unwrap();
148 match t.execute(sql) {
149 Ok(e) => {
150 if self.connection.debug {
151 info!("提交成功: {} {}", thread_id.clone(), sql);
152 }
153 if sql.contains("INSERT") {
154 (true, e.rows)
155 } else {
156 (true, e.affect_count.into())
157 }
158 }
159 Err(e) => {
160 error!("事务提交失败: {} {} {}", thread_id, e, sql);
161 (false, JsonValue::from(e.to_string()))
162 }
163 }
164 } else {
165 let mut db = self.client.get_connect().unwrap();
166 match db.execute(sql) {
167 Ok(e) => {
168 if self.connection.debug {
169 info!("提交成功: {} {}", thread_id.clone(), sql);
170 }
171 if sql.contains("INSERT") {
172 (true, e.rows)
173 } else {
174 (true, e.affect_count.into())
175 }
176 }
177 Err(e) => {
178 error!("非事务提交失败: {} {} {}", thread_id, e, sql);
179 (false, JsonValue::from(e.to_string()))
180 }
181 }
182 }
183 }
184}
185
186impl DbMode for Pgsql {
187 fn database_tables(&mut self) -> JsonValue {
188 let sql = "SHOW TABLES".to_string();
189 match self.sql(sql.as_str()) {
190 Ok(e) => {
191 let mut list = vec![];
192 for item in e.members() {
193 for (_, value) in item.entries() {
194 list.push(value.clone());
195 }
196 }
197 list.into()
198 }
199 Err(_) => {
200 array![]
201 }
202 }
203 }
204
205 fn database_create(&mut self, name: &str) -> bool {
206 let sql = format!("CREATE DATABASE {}", name);
207
208 let (state, data) = self.execute(sql.as_str());
209 match state {
210 true => data.as_bool().unwrap(),
211 false => {
212 error!("创建数据库失败: {:?}", data);
213 false
214 }
215 }
216 }
217}
218
219impl Mode for Pgsql {
220 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
221 let mut sql = String::new();
222 let mut comments = vec![];
223
224 if !options.table_unique.is_empty() {
226 let unique = format!(
227 "CREATE UNIQUE INDEX {01}_unique_{} ON {} ({});",
228 options.table_unique.join("_"),
229 options.table_name,
230 options.table_unique.join(",")
231 );
232 comments.push(unique);
233 }
234
235 for row in options.table_index.iter() {
237 let index = format!(
238 "CREATE INDEX {01}_index_{} ON {} ({})",
239 row.join("_"),
240 options.table_name,
241 row.join(",")
242 );
243 comments.push(index);
244 }
245
246 for (name, field) in options.table_fields.entries_mut() {
247 field["table_name"] = options.table_name.clone().into();
248 let row = br_fields::field("pgsql", name, field.clone());
249 let rows = row.split("comment").collect::<Vec<&str>>();
250 comments.push(format!(
251 "COMMENT ON COLUMN {}.{} IS {};",
252 options.table_name, name, rows[1]
253 ));
254 sql = format!("{} {},\r\n", sql, rows[0]);
255 }
256
257 let primary_key = format!(
258 "CONSTRAINT {}_{} PRIMARY KEY ({})",
259 options.table_name, options.table_key, options.table_key
260 );
261 let sql = format!(
262 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
263 options.table_name,
264 sql.trim_end_matches(",\r\n"),
265 primary_key
266 );
267 comments.insert(0, sql);
268
269 for (_name, field) in options.table_fields.entries() {
270 field["mode"].as_str().unwrap();
271 {}
272 }
273
274 if self.params.sql {
275 let info = comments.join("\r\n");
276 return JsonValue::from(info);
277 }
278 for comment in comments {
279 let (state, _) = self.execute(comment.as_str());
280 match state {
281 true => {}
282 false => {
283 return JsonValue::from(state);
284 }
285 }
286 }
287 JsonValue::from(true)
288 }
289
290 fn table_update(&mut self, options: TableOptions) -> JsonValue {
291 let fields_list = self.table_info(&options.table_name);
292 let mut put = vec![];
293 let mut add = vec![];
294 let mut del = vec![];
295 let mut comments = vec![];
296
297 for (key, _) in fields_list.entries() {
298 if options.table_fields[key].is_empty() {
299 del.push(key);
300 }
301 }
302 for (name, field) in options.table_fields.entries() {
303 if !fields_list[name].is_empty() {
304 let old_comment = fields_list[name]["comment"].to_string();
305 let new_comment = br_fields::field("pgsql", name, field.clone());
306 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
307 let new_comment_text = new_comment[1].trim().trim_start_matches("'").trim_end_matches("'");
308 if old_comment == new_comment_text {
309 continue;
310 }
311 put.push(name);
312 } else {
313 add.push(name);
314 }
315 }
316
317 for name in add.iter() {
318 let name = name.to_string();
319 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
320 let rows = row.split("comment").collect::<Vec<&str>>();
321 comments.push(format!(
322 r#"ALTER TABLE "{}" add {};"#,
323 options.table_name, rows[0]
324 ));
325 comments.push(format!(
326 "COMMENT ON COLUMN {}.{} IS {};",
327 options.table_name, name, rows[1]
328 ));
329 }
330 for name in del.iter() {
331 comments.push(format!(
332 "ALTER TABLE {} DROP {};\r\n",
333 options.table_name, name
334 ));
335 }
336 for name in put.iter() {
337 let name = name.to_string();
338 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
339 let rows = row.split("comment").collect::<Vec<&str>>();
340
341 let sql = rows[0].split(" ").collect::<Vec<&str>>();
342 let text = format!(
343 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
344 options.table_name, name, sql[1]
345 );
346 comments.push(text.clone());
347 if sql.len() > 3 && !sql[3].is_empty() {
348 let text = format!(
349 "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT '{}';\r\n",
350 options.table_name, name, sql[3]
351 );
352 comments.push(text.clone());
353 }
354
355 comments.push(format!(
356 "COMMENT ON COLUMN {}.{} IS {};",
357 options.table_name, name, rows[1]
358 ));
359 }
360
361 let mut unique_new = vec![];
362 let mut index_new = vec![];
363 let mut primary_key = vec![];
364 let (_, index_list) = self.query(
365 format!(
366 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
367 options.table_name
368 ).as_str(),
369 );
370 for item in index_list.members() {
371 let key_name = item["indexname"].as_str().unwrap();
372 let indexdef = item["indexdef"].to_string();
373
374 if indexdef.contains(
375 format!(
376 "CREATE UNIQUE INDEX {}_{} ON",
377 options.table_name, options.table_key
378 ).as_str(),
379 ) {
380 primary_key.push(key_name.to_string());
381 continue;
382 }
383 if indexdef.contains("CREATE UNIQUE INDEX") {
384 unique_new.push(key_name.to_string());
385 continue;
386 }
387 if indexdef.contains("CREATE INDEX") {
388 index_new.push(key_name.to_string());
389 continue;
390 }
391 }
392
393 if !options.table_unique.is_empty() {
394 let name = format!(
395 "{}_unique_{}",
396 options.table_name,
397 options.table_unique.join("_")
398 );
399 let unique = format!(
400 "CREATE UNIQUE INDEX {} ON {} ({});",
401 name,
402 options.table_name,
403 options.table_unique.join(",")
404 );
405 if !unique_new.contains(&name) {
406 comments.push(unique);
407 } else {
408 unique_new.retain(|x| *x != name);
409 }
410 }
411
412 for row in options.table_index.iter() {
414 let name = format!("{}_index_{}", options.table_name, row.join("_"));
415 let index = format!(
416 "CREATE INDEX {} ON {} ({})",
417 name,
418 options.table_name,
419 row.join(",")
420 );
421 if !index_new.contains(&name) {
422 comments.push(index);
423 } else {
424 index_new.retain(|x| *x != name);
425 }
426 }
427
428 for item in unique_new {
429 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
430 }
431 for item in index_new {
432 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
433 }
434
435 if self.params.sql {
436 return JsonValue::from(comments.join(""));
437 }
438
439 if comments.is_empty() {
440 return JsonValue::from(-1);
441 }
442
443 for item in comments.iter() {
444 let (state, res) = self.execute(item.as_str());
445 match state {
446 true => {}
447 false => {
448 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
449 return JsonValue::from(0);
450 }
451 }
452 }
453 JsonValue::from(1)
454 }
455
456 fn table_info(&mut self, table: &str) -> JsonValue {
457 let sql = format!(
458 "SELECT COL.COLUMN_NAME,
459 COL.DATA_TYPE,
460 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
461 LEFT JOIN
462 pg_catalog.pg_description DESCRIPTION
463 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
464 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{}'",
465 table
466 );
467 let (state, data) = self.query(sql.as_str());
468 let mut list = object! {};
469 if state {
470 for item in data.members() {
471 let mut row = object! {};
472 row["field"] = item["column_name"].clone();
473 row["comment"] = item["comment"].clone();
474 row["type"] = item["data_type"].clone();
475 list[row["field"].as_str().unwrap()] = row.clone();
476 }
477 list
478 } else {
479 list
480 }
481 }
482
483 fn table_is_exist(&mut self, name: &str) -> bool {
484 let sql = format!(
485 "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{}')",
486 name
487 );
488 let (state, data) = self.query(sql.as_str());
489 match state {
490 true => {
491 for item in data.members() {
492 if item.has_key("exists") {
493 return item["exists"].as_bool().unwrap();
494 }
495 }
496 false
497 }
498 false => false,
499 }
500 }
501
502 fn table(&mut self, name: &str) -> &mut Pgsql {
503 self.params = Params::default(self.connection.mode.str().as_str());
504 self.params.table = format!("{}{}", self.connection.prefix, name);
505 self.params.join_table = self.params.table.clone();
506 self
507 }
508
509 fn change_table(&mut self, name: &str) -> &mut Self {
510 self.params.join_table = name.to_string();
511 self
512 }
513
514 fn autoinc(&mut self) -> &mut Self {
515 self.params.autoinc = true;
516 self
517 }
518
519 fn fetch_sql(&mut self) -> &mut Self {
520 self.params.sql = true;
521 self
522 }
523
524 fn order(&mut self, field: &str, by: bool) -> &mut Self {
525 self.params.order[field] = {
526 if by {
527 "DESC"
528 } else {
529 "ASC"
530 }
531 }.into();
532 self
533 }
534
535 fn group(&mut self, field: &str) -> &mut Self {
536 let fields: Vec<&str> = field.split(",").collect();
537 for field in fields.iter() {
538 let field = field.to_string();
539 self.params.group[field.as_str()] = field.clone().into();
540 self.params.fields[field.as_str()] = field.clone().into();
541 }
542 self
543 }
544
545 fn distinct(&mut self) -> &mut Self {
546 self.params.distinct = true;
547 self
548 }
549
550 fn json(&mut self, field: &str) -> &mut Self {
551 let list: Vec<&str> = field.split(",").collect();
552 for item in list.iter() {
553 self.params.json[item.to_string().as_str()] = item.to_string().into();
554 }
555 self
556 }
557
558 fn field(&mut self, field: &str) -> &mut Self {
559 let list: Vec<&str> = field.split(",").collect();
560 let join_table = if self.params.join_table.is_empty() {
561 self.params.table.clone()
562 } else {
563 self.params.join_table.clone()
564 };
565 for item in list.iter() {
566 if item.contains(" as ") {
567 let text = item.split(" as ").collect::<Vec<&str>>();
568 if text[0].contains("count(") {
569 self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
570 } else {
571 self.params.fields[item.to_string().as_str()] = format!("{}.{} as {}", join_table, text[0], text[1]).into();
572 }
573 } else {
574 self.params.fields[item.to_string().as_str()] = format!("{}.{}", join_table, item).into();
575 }
576 }
577 self
578 }
579
580 fn hidden(&mut self, name: &str) -> &mut Self {
581 let hidden: Vec<&str> = name.split(",").collect();
582
583 let fields_list = self.table_info(self.params.clone().table.as_str());
584 let mut data = array![];
585 for item in fields_list.members() {
586 data.push(object! {
587 "name":item["field"].as_str().unwrap()
588 }).unwrap();
589 }
590
591 for item in data.members() {
592 let name = item["name"].as_str().unwrap();
593 if !hidden.contains(&name) {
594 self.params.fields[name] = name.into();
595 }
596 }
597 self
598 }
599
600 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
601 let join_table = if self.params.join_table.is_empty() {
602 self.params.table.clone()
603 } else {
604 self.params.join_table.clone()
605 };
606 if value.is_boolean() {
607 if value.as_bool().unwrap() {
608 value = 1.into();
609 } else {
610 value = 0.into();
611 }
612 }
613 match compare {
614 "between" => {
615 self.params.where_and.push(format!(
616 "{}.{} between '{}' AND '{}'",
617 join_table, field, value[0], value[1]
618 ));
619 }
620 "set" => {
621 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
622 let mut wheredata = vec![];
623 for item in list.iter() {
624 wheredata.push(format!("'{}' = ANY (string_to_array({}.{},','))", item, join_table, field));
625 }
626 self.params.where_and.push(format!("({})", wheredata.join(" or ")));
627 }
628 "notin" => {
629 let mut text = String::new();
630 for item in value.members() {
631 text = format!("{},'{}'", text, item);
632 }
633 text = text.trim_start_matches(",").into();
634 self.params.where_and.push(format!("{}.{} not in ({})", join_table, field, text));
635 }
636 "is" => {
637 self.params.where_and.push(format!("{}.{} is {}", join_table, field, value));
638 }
639 "notlike" => {
640 self.params.where_and.push(format!("{}.{} not like '{}'", join_table, field, value));
641 }
642 "in" => {
643 let mut text = String::new();
644 if value.is_array() {
645 for item in value.members() {
646 text = format!("{},'{}'", text, item);
647 }
648 } else if value.is_null() {
649 text = format!("{},null", text);
650 } else {
651 let value = value.as_str().unwrap();
652
653 let value: Vec<&str> = value.split(",").collect();
654 for item in value.iter() {
655 text = format!("{},'{}'", text, item);
656 }
657 }
658 text = text.trim_start_matches(",").into();
659
660 self.params.where_and.push(format!("{}.{} {} ({})", join_table, field, compare, text));
661 }
662 _ => {
663 self.params.where_and.push(format!("{}.{} {} '{}'", join_table, field, compare, value));
664 }
665 }
666 self
667 }
668
669 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
670 let join_table = if self.params.join_table.is_empty() {
671 self.params.table.clone()
672 } else {
673 self.params.join_table.clone()
674 };
675
676 if value.is_boolean() {
677 if value.as_bool().unwrap() {
678 value = 1.into();
679 } else {
680 value = 0.into();
681 }
682 }
683
684 match compare {
685 "between" => {
686 self.params.where_or.push(format!(
687 "{}.{} between '{}' AND '{}'",
688 join_table, field, value[0], value[1]
689 ));
690 }
691 "set" => {
692 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
693 let mut wheredata = vec![];
694 for item in list.iter() {
695 wheredata.push(format!("'{}' = ANY (string_to_array({}.{},','))", item, join_table, field));
696 }
697 self.params.where_or.push(format!("({})", wheredata.join(" or ")));
698 }
699 "notin" => {
700 let mut text = String::new();
701 for item in value.members() {
702 text = format!("{},'{}'", text, item);
703 }
704 text = text.trim_start_matches(",").into();
705 self.params.where_or.push(format!("{}.{} not in ({})", join_table, field, text));
706 }
707 "in" => {
708 let mut text = String::new();
709 if value.is_array() {
710 for item in value.members() {
711 text = format!("{},'{}'", text, item);
712 }
713 } else {
714 let value = value.as_str().unwrap();
715 let value: Vec<&str> = value.split(",").collect();
716 for item in value.iter() {
717 text = format!("{},'{}'", text, item);
718 }
719 }
720 text = text.trim_start_matches(",").into();
721 self.params.where_or.push(format!("{}.{} {} ({})", join_table, field, compare, text));
722 }
723 _ => {
724 self.params.where_or.push(format!("{}.{} {} '{}'", join_table, field, compare, value));
725 }
726 }
727 self
728 }
729
730 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
731 self.params.where_column = format!(
732 "{}.{} {} {}.{}",
733 self.params.table, field_a, compare, self.params.table, field_b
734 );
735 self
736 }
737
738 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
739 self.params.page = page;
740 self.params.limit = limit;
741 self
742 }
743
744 fn column(&mut self, field: &str) -> JsonValue {
745 self.field(field);
746 let sql = self.params.select_sql();
747
748 if self.params.sql {
749 return JsonValue::from(sql);
750 }
751 let (state, data) = self.query(sql.as_str());
752 match state {
753 true => {
754 let mut list = array![];
755 for item in data.members() {
756 if self.params.json[field].is_empty() {
757 list.push(item[field].clone()).unwrap();
758 } else {
759 let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
760 list.push(data).unwrap();
761 }
762 }
763 list
764 }
765 false => {
766 array![]
767 }
768 }
769 }
770
771 fn count(&mut self) -> JsonValue {
772 self.params.fields["count"] = "count(*) as count".to_string().into();
773 let sql = self.params.select_sql();
774 if self.params.sql {
775 return JsonValue::from(sql.clone());
776 }
777 let (state, data) = self.query(sql.as_str());
778 if state {
779 data[0]["count"].clone()
780 } else {
781 JsonValue::from(0)
782 }
783 }
784
785 fn max(&mut self, field: &str) -> JsonValue {
786 self.params.fields[field] = format!("max({00}) as {00}", field).into();
787 let sql = self.params.select_sql();
788 if self.params.sql {
789 return JsonValue::from(sql.clone());
790 }
791 let (state, data) = self.query(sql.as_str());
792 if state {
793 if data.len() > 1 {
794 return data.clone();
795 }
796 data[0][field].clone()
797 } else {
798 JsonValue::from(0)
799 }
800 }
801
802 fn min(&mut self, field: &str) -> JsonValue {
803 self.params.fields[field] = format!("min({00}) as {00}", field).into();
804 let sql = self.params.select_sql();
805 if self.params.sql {
806 return JsonValue::from(sql.clone());
807 }
808 let (state, data) = self.query(sql.as_str());
809 if state {
810 if data.len() > 1 {
811 return data;
812 }
813 data[0][field].clone()
814 } else {
815 JsonValue::from(0)
816 }
817 }
818
819 fn sum(&mut self, field: &str) -> JsonValue {
820 self.params.fields[field] = format!("sum({00}) as {00}", field).into();
821 let sql = self.params.select_sql();
822 if self.params.sql {
823 return JsonValue::from(sql.clone());
824 }
825 let (state, data) = self.query(sql.as_str());
826 match state {
827 true => {
828 if data.len() > 1 {
829 return data;
830 }
831 data[0][field].clone()
832 }
833 false => JsonValue::from(0),
834 }
835 }
836
837 fn avg(&mut self, field: &str) -> JsonValue {
838 self.params.fields[field] = format!("avg({00}) as {00}", field).into();
839 let sql = self.params.select_sql();
840 if self.params.sql {
841 return JsonValue::from(sql.clone());
842 }
843 let (state, data) = self.query(sql.as_str());
844 if state {
845 if data.len() > 1 {
846 return data;
847 }
848 data[0][field].clone()
849 } else {
850 JsonValue::from(0)
851 }
852 }
853
854 fn select(&mut self) -> JsonValue {
855 let sql = self.params.select_sql();
856 if self.params.sql {
857 return JsonValue::from(sql.clone());
858 }
859 let (state, mut data) = self.query(sql.as_str());
860 match state {
861 true => {
862 for (field, _) in self.params.json.entries() {
863 for item in data.members_mut() {
864 if !item[field].is_empty() {
865 let json = item[field].to_string();
866 item[field] = match json::parse(&json) {
867 Ok(e) => e,
868 Err(_) => JsonValue::from(json),
869 };
870 }
871 }
872 }
873 data.clone()
874 }
875 false => array![],
876 }
877 }
878
879 fn find(&mut self) -> JsonValue {
880 self.params.page = 1;
881 self.params.limit = 1;
882 let sql = self.params.select_sql();
883 if self.params.sql {
884 return JsonValue::from(sql.clone());
885 }
886 let (state, mut data) = self.query(sql.as_str());
887 match state {
888 true => {
889 if data.is_empty() {
890 return object! {};
891 }
892 for (field, _) in self.params.json.entries() {
893 if !data[0][field].is_empty() {
894 let json = data[0][field].to_string();
895 let json = json::parse(&json).unwrap_or(array![]);
896 data[0][field] = json;
897 } else {
898 data[0][field] = array![];
899 }
900 }
901 data[0].clone()
902 }
903 false => {
904 object! {}
905 }
906 }
907 }
908
909 fn value(&mut self, field: &str) -> JsonValue {
910 self.params.fields = object! {};
911 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
912 self.params.page = 1;
913 self.params.limit = 1;
914 let sql = self.params.select_sql();
915 if self.params.sql {
916 return JsonValue::from(sql.clone());
917 }
918 let (state, mut data) = self.query(sql.as_str());
919 match state {
920 true => {
921 for (field, _) in self.params.json.entries() {
922 if !data[0][field].is_empty() {
923 let json = data[0][field].to_string();
924 let json = json::parse(&json).unwrap_or(array![]);
925 data[0][field] = json;
926 } else {
927 data[0][field] = array![];
928 }
929 }
930 data[0][field].clone()
931 }
932 false => {
933 if self.connection.debug {
934 info!("{:?}", data);
935 }
936 JsonValue::Null
937 }
938 }
939 }
940
941 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
942 let mut fields = vec![];
943 let mut values = vec![];
944 if !self.params.autoinc && data["id"].is_empty() {
945 data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
946 }
947 for (field, value) in data.entries() {
948 fields.push(field);
949
950 if value.is_string() {
951 values.push(format!("'{}'", value.to_string().replace("'", "''")));
952 continue;
953 } else if value.is_array() {
954 if self.params.json[field].is_empty() {
955 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
956 values.push(format!("'{}'", array));
957 } else {
958 let json = value.to_string();
959 let json = json.replace("'", "''");
960 values.push(format!("'{}'", json));
961 }
962 continue;
963 } else if value.is_object() {
964 if self.params.json[field].is_empty() {
965 values.push(format!("'{}'", value));
966 } else {
967 let json = value.to_string();
968 let json = json.replace("'", "''");
969 values.push(format!("'{}'", json));
970 }
971 continue;
972 } else if value.is_number() || value.is_boolean() || value.is_null() {
973 values.push(format!("{}", value));
974 continue;
975 } else {
976 values.push(format!("'{}'", value));
977 continue;
978 }
979 }
980 let fields = fields.join(",");
981 let values = values.join(",");
982
983 let sql = format!(
984 "INSERT INTO {} ({}) VALUES ({});",
985 self.params.table, fields, values
986 );
987 if self.params.sql {
988 return JsonValue::from(sql.clone());
989 }
990 let (state, ids) = self.execute(sql.as_str());
991
992 match state {
993 true => match self.params.autoinc {
994 true => ids.clone(),
995 false => data["id"].clone(),
996 },
997 false => {
998 let thread_id = format!("{:?}", thread::current().id());
999 error!("添加失败: {} {:?} {}", thread_id, ids, sql);
1000 JsonValue::from("")
1001 }
1002 }
1003 }
1004 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1005 let mut fields = String::new();
1006 if !self.params.autoinc && data[0]["id"].is_empty() {
1007 data[0]["id"] = "".into();
1008 }
1009 for (field, _) in data[0].entries() {
1010 fields = format!("{},{}", fields, field);
1011 }
1012 fields = fields.trim_start_matches(",").parse().unwrap();
1013
1014 let core_count = num_cpus::get();
1015 let mut p = pools::Pool::new(core_count * 4);
1016
1017 let mut rng = rng();
1018 let i: i32 = rng.random_range(1..=100);
1019 let autoinc = self.params.autoinc;
1020 for list in data.members() {
1021 let mut item = list.clone();
1022 p.execute(move |pcindex| {
1023 if !autoinc && item["id"].is_empty() {
1024 let id = format!(
1025 "{:X}{:X}{}",
1026 Local::now().timestamp_nanos_opt().unwrap(),
1027 pcindex,
1028 i
1029 );
1030 item["id"] = id.into();
1031 }
1032 let mut values = "".to_string();
1033 for (_, value) in item.entries() {
1034 if value.is_string() {
1035 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1036 } else if value.is_number() {
1037 values = format!("{},{}", values, value);
1038 } else if value.is_boolean() {
1039 values = format!("{},{}", values, value);
1040 continue;
1041 } else {
1042 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1043 }
1044 }
1045 values = format!("({})", values.trim_start_matches(","));
1046 array![item["id"].clone(), values]
1047 });
1048 }
1049 let (ids_list, mut values) = p.insert_all();
1050 values = values.trim_start_matches(",").parse().unwrap();
1051 let sql = format!(
1052 "INSERT INTO {} ({}) VALUES {};",
1053 self.params.table, fields, values
1054 );
1055
1056 if self.params.sql {
1057 return JsonValue::from(sql.clone());
1058 }
1059 let (state, data) = self.execute(sql.as_str());
1060 match state {
1061 true => match autoinc {
1062 true => data,
1063 false => JsonValue::from(ids_list),
1064 },
1065 false => {
1066 error!("insert_all: {:?}", data);
1067 array![]
1068 }
1069 }
1070 }
1071 fn update(&mut self, data: JsonValue) -> JsonValue {
1072 let mut values = vec![];
1073 for (field, value) in data.entries() {
1074 if value.is_string() {
1075 values.push(format!(
1076 "{}='{}'",
1077 field,
1078 value.to_string().replace("'", "''")
1079 ));
1080 } else if value.is_number() {
1081 values.push(format!("{}= {}", field, value));
1082 } else if value.is_array() {
1083 if self.params.json[field].is_empty() {
1084 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1085 values.push(format!("{}='{}'", field, array));
1086 } else {
1087 let json = value.to_string();
1088 let json = json.replace("'", "''");
1089 values.push(format!("{}='{}'", field, json));
1090 }
1091 continue;
1092 } else if value.is_object() {
1093 if self.params.json[field].is_empty() {
1094 values.push(format!("{}='{}'", field, value));
1095 } else {
1096 if value.is_empty() {
1097 values.push(format!("{}=''", field));
1098 continue;
1099 }
1100 let json = value.to_string();
1101 let json = json.replace("'", "''");
1102 values.push(format!("{}='{}'", field, json));
1103 }
1104 continue;
1105 } else if value.is_boolean() {
1106 values.push(format!("{}= {}", field, value));
1107 } else {
1108 values.push(format!("{}=\"{}\"", field, value));
1109 }
1110 }
1111
1112 for (field, value) in self.params.inc_dec.entries() {
1113 values.push(format!("{} = {}", field, value.to_string().clone()));
1114 }
1115
1116 let values = values.join(",");
1117
1118 let sql = format!(
1119 "UPDATE {} SET {} {};",
1120 self.params.table.clone(),
1121 values,
1122 self.params.where_sql()
1123 );
1124 if self.params.sql {
1125 return JsonValue::from(sql.clone());
1126 }
1127 let (state, data) = self.execute(sql.as_str());
1128 if state {
1129 data
1130 } else {
1131 let thread_id = format!("{:?}", thread::current().id());
1132 error!("update: {} {:?} {}", thread_id, data, sql);
1133 0.into()
1134 }
1135 }
1136 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1137 let mut values = vec![];
1138
1139 let mut ids = vec![];
1140 for (field, _) in data[0].entries() {
1141 if field == "id" {
1142 continue;
1143 }
1144 let mut fields = vec![];
1145 for row in data.members() {
1146 let value = row[field].clone();
1147 let id = row["id"].clone();
1148 ids.push(id.clone());
1149 if value.is_string() {
1150 fields.push(format!(
1151 "WHEN '{}' THEN '{}'",
1152 id,
1153 value.to_string().replace("'", "''")
1154 ));
1155 } else if value.is_array() || value.is_object() {
1156 if self.params.json[field].is_empty() {
1157 fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1158 } else {
1159 let json = value.to_string();
1160 let json = json.replace("'", "''");
1161 fields.push(format!("WHEN '{}' THEN '{}'", id, json));
1162 }
1163 continue;
1164 } else if value.is_number() || value.is_boolean() || value.is_null() {
1165 fields.push(format!("WHEN '{}' THEN {}", id, value));
1166 } else {
1167 fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1168 }
1169 }
1170 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1171 }
1172 self.where_and("id", "in", ids.into());
1173 for (field, value) in self.params.inc_dec.entries() {
1174 values.push(format!("{} = {}", field, value.to_string().clone()));
1175 }
1176
1177 let values = values.join(",");
1178 let sql = format!(
1179 "UPDATE {} SET {} {} {};",
1180 self.params.table.clone(),
1181 values,
1182 self.params.where_sql(),
1183 self.params.page_limit_sql()
1184 );
1185 if self.params.sql {
1186 return JsonValue::from(sql.clone());
1187 }
1188 let (state, data) = self.execute(sql.as_str());
1189 if state {
1190 data
1191 } else {
1192 error!("update_all: {:?}", data);
1193 JsonValue::from(0)
1194 }
1195 }
1196
1197 fn delete(&mut self) -> JsonValue {
1198 let sql = format!(
1199 "delete FROM {} {} {};",
1200 self.params.table.clone(),
1201 self.params.where_sql(),
1202 self.params.page_limit_sql()
1203 );
1204 if self.params.sql {
1205 return JsonValue::from(sql.clone());
1206 }
1207 let (state, data) = self.execute(sql.as_str());
1208 match state {
1209 true => data,
1210 false => {
1211 error!("delete 失败>>> {:?}", data);
1212 JsonValue::from(0)
1213 }
1214 }
1215 }
1216
1217 fn transaction(&mut self) -> bool {
1218 let thread_id = format!("{:?}", thread::current().id());
1219
1220 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1221 let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1222 t += 1;
1223 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1224 return true;
1225 }
1226
1227 TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1228 let key = format!("{}{}", self.default, thread_id);
1229 TR.lock().unwrap().insert(key.clone(), Arc::new(Mutex::new(self.client.get_connect().unwrap())));
1230
1231 let sql = "START TRANSACTION;".to_string();
1232 let (state, _) = self.execute(sql.as_str());
1233 match state {
1234 true => {
1235 let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1236 let (state, _) = self.execute(sql.as_str());
1237 match state {
1238 true => state,
1239 false => {
1240 TRANS.lock().unwrap().remove(&*thread_id.clone());
1241 TR.lock().unwrap().remove(&key.clone());
1242 state
1243 }
1244 }
1245 }
1246 false => {
1247 TRANS.lock().unwrap().remove(&*thread_id.clone());
1248 TR.lock().unwrap().remove(&key.clone());
1249 state
1250 }
1251 }
1252 }
1253 fn commit(&mut self) -> bool {
1254 let thread_id = format!("{:?}", thread::current().id());
1255 let sql = "COMMIT".to_string();
1256
1257 let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1258 if t > 1 {
1259 t -= 1;
1260 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1261 return true;
1262 }
1263 let (state, data) = self.execute(sql.as_str());
1264 TRANS.lock().unwrap().remove(&thread_id);
1265 let key = format!("{}{}", self.default, thread_id);
1266 TR.lock().unwrap().remove(&*key);
1267 match state {
1268 true => {}
1269 false => {
1270 error!("提交事务失败: {}", data);
1271 }
1272 }
1273 state
1274 }
1275
1276 fn rollback(&mut self) -> bool {
1277 let thread_id = format!("{:?}", thread::current().id());
1278 let sql = "ROLLBACK".to_string();
1279
1280 let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1281 if t > 1 {
1282 t -= 1;
1283 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1284 return true;
1285 }
1286 let (state, data) = self.execute(sql.as_str());
1287 TRANS.lock().unwrap().remove(&thread_id);
1288 let key = format!("{}{}", self.default, thread_id);
1289 TR.lock().unwrap().remove(&*key);
1290 match state {
1291 true => {}
1292 false => {
1293 error!("回滚失败: {}", data);
1294 }
1295 }
1296 state
1297 }
1298
1299 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1300 let (state, data) = self.query(sql);
1301 match state {
1302 true => Ok(data),
1303 false => Err(data.to_string()),
1304 }
1305 }
1306
1307 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1308 let (state, data) = self.execute(sql);
1309 match state {
1310 true => Ok(data),
1311 false => Err(data.to_string()),
1312 }
1313 }
1314
1315 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1316 self.params.inc_dec[field] = format!("{} + {}", field, num).into();
1317 self
1318 }
1319
1320 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1321 self.params.inc_dec[field] = format!("{} - {}", field, num).into();
1322 self
1323 }
1324 fn buildsql(&mut self) -> String {
1325 self.fetch_sql();
1326 let sql = self.select().to_string();
1327 format!("( {} ) {}", sql, self.params.table)
1328 }
1329
1330 fn join(&mut self, table: &str, main_fields: &str, right_fields: &str) -> &mut Self {
1331 let main_fields = if main_fields.is_empty() {
1332 "id"
1333 } else {
1334 main_fields
1335 };
1336 let right_fields = if right_fields.is_empty() {
1337 self.params.table.clone()
1338 } else {
1339 right_fields.to_string().clone()
1340 };
1341 self.params.join_table = table.to_string();
1342 self.params.join.push(format!(
1343 " LEFT JOIN {} ON {}.{} = {}.{}",
1344 table, self.params.table, main_fields, table, right_fields
1345 ));
1346 self
1347 }
1348
1349 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1350 let main_fields = if main_fields.is_empty() {
1351 "id"
1352 } else {
1353 main_fields
1354 };
1355 let second_fields = if second_fields.is_empty() {
1356 self.params.table.clone()
1357 } else {
1358 second_fields.to_string().clone()
1359 };
1360 let sec_table_name = format!("{}{}", table, "_2");
1361 let second_table = format!("{} {}", table, sec_table_name.clone());
1362 self.params.join_table = sec_table_name.clone();
1363 self.params.join.push(format!(
1364 " INNER JOIN {} ON {}.{} = {}.{}",
1365 second_table, self.params.table, main_fields, sec_table_name, second_fields
1366 ));
1367 self
1368 }
1369}