1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::pools;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use std::sync::Mutex;
10use std::sync::Arc;
11use std::thread;
12use rand::{rng, Rng};
13use br_pgsql::connect::Connect;
14use br_pgsql::pools::Pools;
15
16lazy_static! {
17 static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
18 static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
19}
20#[derive(Clone)]
21pub struct Pgsql {
22 pub connection: Connection,
24 pub default: String,
26 pub params: Params,
27 pub client: Pools,
28}
29
30impl Pgsql {
31 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
32 let port = connection.hostport.parse::<i32>().map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
33
34 let cp_connection = connection.clone();
35 let config = object! {
36 debug: cp_connection.debug,
37 username: cp_connection.username,
38 userpass: cp_connection.userpass,
39 database: cp_connection.database,
40 hostname: cp_connection.hostname,
41 hostport: port,
42 charset: cp_connection.charset.str(),
43 };
44 let mut pgsql = br_pgsql::Pgsql::new(&config)?;
45
46 let pools = pgsql.pools()?;
47 Ok(Self {
48 connection,
49 default: default.clone(),
50 params: Params::default("pgsql"),
51 client: pools,
52 })
53 }
54
55 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
56 let thread_id = format!("{:?}", thread::current().id());
57 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
61 let key = format!("{}{}", self.default, thread_id);
63 let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
64
65 let mut t = db.lock().unwrap();
66 match t.query(sql) {
67 Ok(e) => {
68 if self.connection.debug {
69 info!("查询成功: {} {}", thread_id.clone(), sql);
70 }
71 (true, e.rows)
72 }
73 Err(e) => {
74 error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
75 (false, JsonValue::from(e.to_string()))
76 }
77 }
78 } else {
79 let mut guard = match self.client.get_guard() {
81 Ok(g) => g,
82 Err(e) => {
83 error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
84 return (false, JsonValue::from(e.to_string()));
85 }
86 };
87
88 let res = guard.conn().query(sql);
89 match res {
90 Ok(e) => {
91 if self.connection.debug {
92 info!("查询成功: {} {}", thread_id.clone(), sql);
93 }
94 (true, e.rows)
95 }
96 Err(e) => {
97 error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
98 (false, JsonValue::from(e.to_string()))
99 }
100 }
101 }
103 }
104 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
105 let thread_id = format!("{:?}", thread::current().id());
106
107 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
108 let key = format!("{}{}", self.default, thread_id);
109 let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
110 let mut t = db.lock().unwrap();
111 match t.execute(sql) {
112 Ok(e) => {
113 if self.connection.debug {
114 info!("提交成功: {} {}", thread_id.clone(), sql);
115 }
116 if sql.contains("INSERT") {
117 (true, e.rows)
118 } else {
119 (true, e.affect_count.into())
120 }
121 }
122 Err(e) => {
123 error!("事务提交失败: {thread_id} {e} {sql}");
124 (false, JsonValue::from(e.to_string()))
125 }
126 }
127 } else {
128 let mut guard = match self.client.get_guard() {
130 Ok(g) => g,
131 Err(e) => {
132 error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
133 return (false, JsonValue::from(e.to_string()));
134 }
135 };
136
137 let res = guard.conn().execute(sql);
138 match res {
139 Ok(e) => {
140 if self.connection.debug {
141 info!("提交成功: {} {}", thread_id.clone(), sql);
142 }
143 if sql.contains("INSERT") {
144 (true, e.rows)
145 } else {
146 (true, e.affect_count.into())
147 }
148 }
149 Err(e) => {
150 error!("非事务提交失败: {thread_id} {e} {sql}");
151 (false, JsonValue::from(e.to_string()))
152 }
153 }
154 }
156 }
157}
158
159impl DbMode for Pgsql {
160 fn database_tables(&mut self) -> JsonValue {
161 let sql = "SHOW TABLES".to_string();
162 match self.sql(sql.as_str()) {
163 Ok(e) => {
164 let mut list = vec![];
165 for item in e.members() {
166 for (_, value) in item.entries() {
167 list.push(value.clone());
168 }
169 }
170 list.into()
171 }
172 Err(_) => {
173 array![]
174 }
175 }
176 }
177
178 fn database_create(&mut self, name: &str) -> bool {
179 let sql = format!("CREATE DATABASE {name}");
180
181 let (state, data) = self.execute(sql.as_str());
182 match state {
183 true => data.as_bool().unwrap(),
184 false => {
185 error!("创建数据库失败: {data:?}");
186 false
187 }
188 }
189 }
190}
191
192impl Mode for Pgsql {
193 fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
194 let mut sql = String::new();
195 let mut comments = vec![];
196
197 if !options.table_unique.is_empty() {
199 let unique = format!(
200 "CREATE UNIQUE INDEX {01}_unique_{} ON {} ({});",
201 options.table_unique.join("_"),
202 options.table_name,
203 options.table_unique.join(",")
204 );
205 comments.push(unique);
206 }
207
208 for row in options.table_index.iter() {
210 let index = format!(
211 "CREATE INDEX {01}_index_{} ON {} ({})",
212 row.join("_"),
213 options.table_name,
214 row.join(",")
215 );
216 comments.push(index);
217 }
218
219 for (name, field) in options.table_fields.entries_mut() {
220 field["table_name"] = options.table_name.clone().into();
221 let row = br_fields::field("pgsql", name, field.clone());
222 let rows = row.split("comment").collect::<Vec<&str>>();
223 comments.push(format!(
224 "COMMENT ON COLUMN {}.{} IS {};",
225 options.table_name, name, rows[1]
226 ));
227 sql = format!("{} {},\r\n", sql, rows[0]);
228 }
229
230 let primary_key = format!(
231 "CONSTRAINT {}_{} PRIMARY KEY ({})",
232 options.table_name, options.table_key, options.table_key
233 );
234 let sql = format!(
235 "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
236 options.table_name,
237 sql.trim_end_matches(",\r\n"),
238 primary_key
239 );
240 comments.insert(0, sql);
241
242 for (_name, field) in options.table_fields.entries() {
243 field["mode"].as_str().unwrap();
244 {}
245 }
246
247 if self.params.sql {
248 let info = comments.join("\r\n");
249 return JsonValue::from(info);
250 }
251 for comment in comments {
252 let (state, _) = self.execute(comment.as_str());
253 match state {
254 true => {}
255 false => {
256 return JsonValue::from(state);
257 }
258 }
259 }
260 JsonValue::from(true)
261 }
262
263 fn table_update(&mut self, options: TableOptions) -> JsonValue {
264 let fields_list = self.table_info(&options.table_name);
265 let mut put = vec![];
266 let mut add = vec![];
267 let mut del = vec![];
268 let mut comments = vec![];
269
270 for (key, _) in fields_list.entries() {
271 if options.table_fields[key].is_empty() {
272 del.push(key);
273 }
274 }
275 for (name, field) in options.table_fields.entries() {
276 if !fields_list[name].is_empty() {
277 let old_comment = fields_list[name]["comment"].to_string();
278 let new_comment = br_fields::field("pgsql", name, field.clone());
279 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
280 let new_comment_text = new_comment[1].trim().trim_start_matches("'").trim_end_matches("'");
281 if old_comment == new_comment_text {
282 continue;
283 }
284 put.push(name);
285 } else {
286 add.push(name);
287 }
288 }
289
290 for name in add.iter() {
291 let name = name.to_string();
292 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
293 let rows = row.split("comment").collect::<Vec<&str>>();
294 comments.push(format!(
295 r#"ALTER TABLE "{}" add {};"#,
296 options.table_name, rows[0]
297 ));
298 comments.push(format!(
299 "COMMENT ON COLUMN {}.{} IS {};",
300 options.table_name, name, rows[1]
301 ));
302 }
303 for name in del.iter() {
304 comments.push(format!(
305 "ALTER TABLE {} DROP {};\r\n",
306 options.table_name, name
307 ));
308 }
309 for name in put.iter() {
310 let name = name.to_string();
311 let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
312 let rows = row.split("comment").collect::<Vec<&str>>();
313
314 let sql = rows[0].split(" ").collect::<Vec<&str>>();
315
316 if sql[1].contains("BOOLEAN") {
317 let text = format!(
318 "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
319 options.table_name, name
320 );
321 comments.push(text.clone());
322 let text = format!(
323 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
324 options.table_name, name, sql[1]
325 );
326 comments.push(text.clone());
327 } else {
328 let text = format!(
329 "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
330 options.table_name, name, sql[1]
331 );
332 comments.push(text.clone());
333 };
334
335 if sql.len() > 3 && !sql[3].is_empty() {
336 let text = format!(
337 "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT '{}';\r\n",
338 options.table_name, name, sql[3]
339 );
340 comments.push(text.clone());
341 }
342
343 comments.push(format!(
344 "COMMENT ON COLUMN {}.{} IS {};",
345 options.table_name, name, rows[1]
346 ));
347 }
348
349 let mut unique_new = vec![];
350 let mut index_new = vec![];
351 let mut primary_key = vec![];
352 let (_, index_list) = self.query(
353 format!(
354 "SELECT * FROM pg_indexes WHERE tablename = '{}'",
355 options.table_name
356 ).as_str(),
357 );
358 for item in index_list.members() {
359 let key_name = item["indexname"].as_str().unwrap();
360 let indexdef = item["indexdef"].to_string();
361
362 if indexdef.contains(
363 format!(
364 "CREATE UNIQUE INDEX {}_{} ON",
365 options.table_name, options.table_key
366 ).as_str(),
367 ) {
368 primary_key.push(key_name.to_string());
369 continue;
370 }
371 if indexdef.contains("CREATE UNIQUE INDEX") {
372 unique_new.push(key_name.to_string());
373 continue;
374 }
375 if indexdef.contains("CREATE INDEX") {
376 index_new.push(key_name.to_string());
377 continue;
378 }
379 }
380
381 if !options.table_unique.is_empty() {
382 let name = format!(
383 "{}_unique_{}",
384 options.table_name,
385 options.table_unique.join("_")
386 );
387 let unique = format!(
388 "CREATE UNIQUE INDEX {} ON {} ({});",
389 name,
390 options.table_name,
391 options.table_unique.join(",")
392 );
393 if !unique_new.contains(&name) {
394 comments.push(unique);
395 } else {
396 unique_new.retain(|x| *x != name);
397 }
398 }
399
400 for row in options.table_index.iter() {
402 let name = format!("{}_index_{}", options.table_name, row.join("_"));
403 let index = format!(
404 "CREATE INDEX {} ON {} ({})",
405 name,
406 options.table_name,
407 row.join(",")
408 );
409 if !index_new.contains(&name) {
410 comments.push(index);
411 } else {
412 index_new.retain(|x| *x != name);
413 }
414 }
415
416 for item in unique_new {
417 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
418 }
419 for item in index_new {
420 comments.push(format!("DROP INDEX {};\r\n", item.clone()));
421 }
422
423 if self.params.sql {
424 return JsonValue::from(comments.join(""));
425 }
426
427 if comments.is_empty() {
428 return JsonValue::from(-1);
429 }
430
431 for item in comments.iter() {
432 let (state, res) = self.execute(item.as_str());
433 match state {
434 true => {}
435 false => {
436 error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
437 return JsonValue::from(0);
438 }
439 }
440 }
441 JsonValue::from(1)
442 }
443
444 fn table_info(&mut self, table: &str) -> JsonValue {
445 let sql = format!(
446 "SELECT COL.COLUMN_NAME,
447 COL.DATA_TYPE,
448 COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
449 LEFT JOIN
450 pg_catalog.pg_description DESCRIPTION
451 ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
452 AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
453 let (state, data) = self.query(sql.as_str());
454 let mut list = object! {};
455 if state {
456 for item in data.members() {
457 let mut row = object! {};
458 row["field"] = item["column_name"].clone();
459 row["comment"] = item["comment"].clone();
460 row["type"] = item["data_type"].clone();
461 list[row["field"].as_str().unwrap()] = row.clone();
462 }
463 list
464 } else {
465 list
466 }
467 }
468
469 fn table_is_exist(&mut self, name: &str) -> bool {
470 let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
471 let (state, data) = self.query(sql.as_str());
472 match state {
473 true => {
474 for item in data.members() {
475 if item.has_key("exists") {
476 return item["exists"].as_bool().unwrap();
477 }
478 }
479 false
480 }
481 false => false,
482 }
483 }
484
485 fn table(&mut self, name: &str) -> &mut Pgsql {
486 self.params = Params::default(self.connection.mode.str().as_str());
487 self.params.table = format!("{}{}", self.connection.prefix, name);
488 self.params.join_table = self.params.table.clone();
489 self
490 }
491
492 fn change_table(&mut self, name: &str) -> &mut Self {
493 self.params.join_table = name.to_string();
494 self
495 }
496
497 fn autoinc(&mut self) -> &mut Self {
498 self.params.autoinc = true;
499 self
500 }
501
502 fn fetch_sql(&mut self) -> &mut Self {
503 self.params.sql = true;
504 self
505 }
506
507 fn order(&mut self, field: &str, by: bool) -> &mut Self {
508 self.params.order[field] = {
509 if by {
510 "DESC"
511 } else {
512 "ASC"
513 }
514 }.into();
515 self
516 }
517
518 fn group(&mut self, field: &str) -> &mut Self {
519 let fields: Vec<&str> = field.split(",").collect();
520 for field in fields.iter() {
521 let field = field.to_string();
522 self.params.group[field.as_str()] = field.clone().into();
523 self.params.fields[field.as_str()] = field.clone().into();
524 }
525 self
526 }
527
528 fn distinct(&mut self) -> &mut Self {
529 self.params.distinct = true;
530 self
531 }
532
533 fn json(&mut self, field: &str) -> &mut Self {
534 let list: Vec<&str> = field.split(",").collect();
535 for item in list.iter() {
536 self.params.json[item.to_string().as_str()] = item.to_string().into();
537 }
538 self
539 }
540
541 fn location(&mut self, field: &str) -> &mut Self {
542 let list: Vec<&str> = field.split(",").collect();
543 for item in list.iter() {
544 self.params.location[item.to_string().as_str()] = item.to_string().into();
545 }
546 self
547 }
548
549 fn field(&mut self, field: &str) -> &mut Self {
550 let list: Vec<&str> = field.split(",").collect();
551 let join_table = if self.params.join_table.is_empty() {
552 self.params.table.clone()
553 } else {
554 self.params.join_table.clone()
555 };
556 for item in list.iter() {
557 if item.contains(" as ") {
558 let text = item.split(" as ").collect::<Vec<&str>>();
559 if text[0].contains("count(") {
560 self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
561 } else {
562 self.params.fields[item.to_string().as_str()] = format!("{}.{} as {}", join_table, text[0], text[1]).into();
563 }
564 } else {
565 self.params.fields[item.to_string().as_str()] = format!("{join_table}.{item}").into();
566 }
567 }
568 self
569 }
570
571 fn hidden(&mut self, name: &str) -> &mut Self {
572 let hidden: Vec<&str> = name.split(",").collect();
573
574 let fields_list = self.table_info(self.params.clone().table.as_str());
575 let mut data = array![];
576 for item in fields_list.members() {
577 data.push(object! {
578 "name":item["field"].as_str().unwrap()
579 }).unwrap();
580 }
581
582 for item in data.members() {
583 let name = item["name"].as_str().unwrap();
584 if !hidden.contains(&name) {
585 self.params.fields[name] = name.into();
586 }
587 }
588 self
589 }
590
591 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
592 let join_table = if self.params.join_table.is_empty() {
593 self.params.table.clone()
594 } else {
595 self.params.join_table.clone()
596 };
597 if value.is_boolean() {
598 if value.as_bool().unwrap() {
599 value = 1.into();
600 } else {
601 value = 0.into();
602 }
603 }
604 match compare {
605 "between" => {
606 self.params.where_and.push(format!(
607 "{}.{} between '{}' AND '{}'",
608 join_table, field, value[0], value[1]
609 ));
610 }
611 "set" => {
612 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
613 let mut wheredata = vec![];
614 for item in list.iter() {
615 wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
616 }
617 self.params.where_and.push(format!("({})", wheredata.join(" or ")));
618 }
619 "notin" => {
620 let mut text = String::new();
621 for item in value.members() {
622 text = format!("{text},'{item}'");
623 }
624 text = text.trim_start_matches(",").into();
625 self.params.where_and.push(format!("{join_table}.{field} not in ({text})"));
626 }
627 "is" => {
628 self.params.where_and.push(format!("{join_table}.{field} is {value}"));
629 }
630 "notlike" => {
631 self.params.where_and.push(format!("{join_table}.{field} not like '{value}'"));
632 }
633 "in" => {
634 let mut text = String::new();
635 if value.is_array() {
636 for item in value.members() {
637 text = format!("{text},'{item}'");
638 }
639 } else if value.is_null() {
640 text = format!("{text},null");
641 } else {
642 let value = value.as_str().unwrap();
643
644 let value: Vec<&str> = value.split(",").collect();
645 for item in value.iter() {
646 text = format!("{text},'{item}'");
647 }
648 }
649 text = text.trim_start_matches(",").into();
650
651 self.params.where_and.push(format!("{join_table}.{field} {compare} ({text})"));
652 }
653 _ => {
654 self.params.where_and.push(format!("{join_table}.{field} {compare} '{value}'"));
655 }
656 }
657 self
658 }
659
660 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
661 let join_table = if self.params.join_table.is_empty() {
662 self.params.table.clone()
663 } else {
664 self.params.join_table.clone()
665 };
666
667 if value.is_boolean() {
668 if value.as_bool().unwrap() {
669 value = 1.into();
670 } else {
671 value = 0.into();
672 }
673 }
674
675 match compare {
676 "between" => {
677 self.params.where_or.push(format!(
678 "{}.{} between '{}' AND '{}'",
679 join_table, field, value[0], value[1]
680 ));
681 }
682 "set" => {
683 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
684 let mut wheredata = vec![];
685 for item in list.iter() {
686 wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
687 }
688 self.params.where_or.push(format!("({})", wheredata.join(" or ")));
689 }
690 "notin" => {
691 let mut text = String::new();
692 for item in value.members() {
693 text = format!("{text},'{item}'");
694 }
695 text = text.trim_start_matches(",").into();
696 self.params.where_or.push(format!("{join_table}.{field} not in ({text})"));
697 }
698 "in" => {
699 let mut text = String::new();
700 if value.is_array() {
701 for item in value.members() {
702 text = format!("{text},'{item}'");
703 }
704 } else {
705 let value = value.as_str().unwrap();
706 let value: Vec<&str> = value.split(",").collect();
707 for item in value.iter() {
708 text = format!("{text},'{item}'");
709 }
710 }
711 text = text.trim_start_matches(",").into();
712 self.params.where_or.push(format!("{join_table}.{field} {compare} ({text})"));
713 }
714 _ => {
715 self.params.where_or.push(format!("{join_table}.{field} {compare} '{value}'"));
716 }
717 }
718 self
719 }
720
721 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
722 self.params.where_column = format!(
723 "{}.{} {} {}.{}",
724 self.params.table, field_a, compare, self.params.table, field_b
725 );
726 self
727 }
728
729 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
730 self.params.page = page;
731 self.params.limit = limit;
732 self
733 }
734
735 fn column(&mut self, field: &str) -> JsonValue {
736 self.field(field);
737 let sql = self.params.select_sql();
738
739 if self.params.sql {
740 return JsonValue::from(sql);
741 }
742 let (state, data) = self.query(sql.as_str());
743 match state {
744 true => {
745 let mut list = array![];
746 for item in data.members() {
747 if self.params.json[field].is_empty() {
748 list.push(item[field].clone()).unwrap();
749 } else {
750 let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
751 list.push(data).unwrap();
752 }
753 }
754 list
755 }
756 false => {
757 array![]
758 }
759 }
760 }
761
762 fn count(&mut self) -> JsonValue {
763 self.params.fields["count"] = "count(*) as count".to_string().into();
764 let sql = self.params.select_sql();
765 if self.params.sql {
766 return JsonValue::from(sql.clone());
767 }
768 let (state, data) = self.query(sql.as_str());
769 if state {
770 data[0]["count"].clone()
771 } else {
772 JsonValue::from(0)
773 }
774 }
775
776 fn max(&mut self, field: &str) -> JsonValue {
777 self.params.fields[field] = format!("max({field}) as {field}").into();
778 let sql = self.params.select_sql();
779 if self.params.sql {
780 return JsonValue::from(sql.clone());
781 }
782 let (state, data) = self.query(sql.as_str());
783 if state {
784 if data.len() > 1 {
785 return data.clone();
786 }
787 data[0][field].clone()
788 } else {
789 JsonValue::from(0)
790 }
791 }
792
793 fn min(&mut self, field: &str) -> JsonValue {
794 self.params.fields[field] = format!("min({field}) as {field}").into();
795 let sql = self.params.select_sql();
796 if self.params.sql {
797 return JsonValue::from(sql.clone());
798 }
799 let (state, data) = self.query(sql.as_str());
800 if state {
801 if data.len() > 1 {
802 return data;
803 }
804 data[0][field].clone()
805 } else {
806 JsonValue::from(0)
807 }
808 }
809
810 fn sum(&mut self, field: &str) -> JsonValue {
811 self.params.fields[field] = format!("sum({field}) as {field}").into();
812 let sql = self.params.select_sql();
813 if self.params.sql {
814 return JsonValue::from(sql.clone());
815 }
816 let (state, data) = self.query(sql.as_str());
817 match state {
818 true => {
819 if data.len() > 1 {
820 return data;
821 }
822 data[0][field].clone()
823 }
824 false => JsonValue::from(0),
825 }
826 }
827
828 fn avg(&mut self, field: &str) -> JsonValue {
829 self.params.fields[field] = format!("avg({field}) as {field}").into();
830 let sql = self.params.select_sql();
831 if self.params.sql {
832 return JsonValue::from(sql.clone());
833 }
834 let (state, data) = self.query(sql.as_str());
835 if state {
836 if data.len() > 1 {
837 return data;
838 }
839 data[0][field].clone()
840 } else {
841 JsonValue::from(0)
842 }
843 }
844
845 fn select(&mut self) -> JsonValue {
846 let sql = self.params.select_sql();
847 if self.params.sql {
848 return JsonValue::from(sql.clone());
849 }
850 let (state, mut data) = self.query(sql.as_str());
851 match state {
852 true => {
853 for (field, _) in self.params.json.entries() {
854 for item in data.members_mut() {
855 if !item[field].is_empty() {
856 let json = item[field].to_string();
857 item[field] = match json::parse(&json) {
858 Ok(e) => e,
859 Err(_) => JsonValue::from(json),
860 };
861 }
862 }
863 }
864 data.clone()
865 }
866 false => array![],
867 }
868 }
869
870 fn find(&mut self) -> JsonValue {
871 self.params.page = 1;
872 self.params.limit = 1;
873 let sql = self.params.select_sql();
874 if self.params.sql {
875 return JsonValue::from(sql.clone());
876 }
877 let (state, mut data) = self.query(sql.as_str());
878 match state {
879 true => {
880 if data.is_empty() {
881 return object! {};
882 }
883 for (field, _) in self.params.json.entries() {
884 if !data[0][field].is_empty() {
885 let json = data[0][field].to_string();
886 let json = json::parse(&json).unwrap_or(array![]);
887 data[0][field] = json;
888 } else {
889 data[0][field] = array![];
890 }
891 }
892 data[0].clone()
893 }
894 false => {
895 object! {}
896 }
897 }
898 }
899
900 fn value(&mut self, field: &str) -> JsonValue {
901 self.params.fields = object! {};
902 self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
903 self.params.page = 1;
904 self.params.limit = 1;
905 let sql = self.params.select_sql();
906 if self.params.sql {
907 return JsonValue::from(sql.clone());
908 }
909 let (state, mut data) = self.query(sql.as_str());
910 match state {
911 true => {
912 for (field, _) in self.params.json.entries() {
913 if !data[0][field].is_empty() {
914 let json = data[0][field].to_string();
915 let json = json::parse(&json).unwrap_or(array![]);
916 data[0][field] = json;
917 } else {
918 data[0][field] = array![];
919 }
920 }
921 data[0][field].clone()
922 }
923 false => {
924 if self.connection.debug {
925 info!("{data:?}");
926 }
927 JsonValue::Null
928 }
929 }
930 }
931
932 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
933 let mut fields = vec![];
934 let mut values = vec![];
935 if !self.params.autoinc && data["id"].is_empty() {
936 data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
937 }
938 for (field, value) in data.entries() {
939 fields.push(field);
940
941 if value.is_string() {
942 values.push(format!("'{}'", value.to_string().replace("'", "''")));
943 continue;
944 } else if value.is_array() {
945 if self.params.json[field].is_empty() {
946 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
947 values.push(format!("'{array}'"));
948 } else {
949 let json = value.to_string();
950 let json = json.replace("'", "''");
951 values.push(format!("'{json}'"));
952 }
953 continue;
954 } else if value.is_object() {
955 if self.params.json[field].is_empty() {
956 values.push(format!("'{value}'"));
957 } else {
958 let json = value.to_string();
959 let json = json.replace("'", "''");
960 values.push(format!("'{json}'"));
961 }
962 continue;
963 } else if value.is_number() || value.is_boolean() || value.is_null() {
964 values.push(format!("{value}"));
965 continue;
966 } else {
967 values.push(format!("'{value}'"));
968 continue;
969 }
970 }
971 let fields = fields.join(",");
972 let values = values.join(",");
973
974 let sql = format!(
975 "INSERT INTO {} ({}) VALUES ({});",
976 self.params.table, fields, values
977 );
978 if self.params.sql {
979 return JsonValue::from(sql.clone());
980 }
981 let (state, ids) = self.execute(sql.as_str());
982
983 match state {
984 true => match self.params.autoinc {
985 true => ids.clone(),
986 false => data["id"].clone(),
987 },
988 false => {
989 let thread_id = format!("{:?}", thread::current().id());
990 error!("添加失败: {thread_id} {ids:?} {sql}");
991 JsonValue::from("")
992 }
993 }
994 }
995 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
996 let mut fields = String::new();
997 if !self.params.autoinc && data[0]["id"].is_empty() {
998 data[0]["id"] = "".into();
999 }
1000 for (field, _) in data[0].entries() {
1001 fields = format!("{fields},{field}");
1002 }
1003 fields = fields.trim_start_matches(",").parse().unwrap();
1004
1005 let core_count = num_cpus::get();
1006 let mut p = pools::Pool::new(core_count * 4);
1007
1008 let mut rng = rng();
1009 let i: i32 = rng.random_range(1..=100);
1010 let autoinc = self.params.autoinc;
1011 for list in data.members() {
1012 let mut item = list.clone();
1013 p.execute(move |pcindex| {
1014 if !autoinc && item["id"].is_empty() {
1015 let id = format!(
1016 "{:X}{:X}{}",
1017 Local::now().timestamp_nanos_opt().unwrap(),
1018 pcindex,
1019 i
1020 );
1021 item["id"] = id.into();
1022 }
1023 let mut values = "".to_string();
1024 for (_, value) in item.entries() {
1025 if value.is_string() {
1026 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1027 } else if value.is_number() {
1028 values = format!("{values},{value}");
1029 } else if value.is_boolean() {
1030 values = format!("{values},{value}");
1031 continue;
1032 } else {
1033 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1034 }
1035 }
1036 values = format!("({})", values.trim_start_matches(","));
1037 array![item["id"].clone(), values]
1038 });
1039 }
1040 let (ids_list, mut values) = p.insert_all();
1041 values = values.trim_start_matches(",").parse().unwrap();
1042 let sql = format!(
1043 "INSERT INTO {} ({}) VALUES {};",
1044 self.params.table, fields, values
1045 );
1046
1047 if self.params.sql {
1048 return JsonValue::from(sql.clone());
1049 }
1050 let (state, data) = self.execute(sql.as_str());
1051 match state {
1052 true => match autoinc {
1053 true => data,
1054 false => JsonValue::from(ids_list),
1055 },
1056 false => {
1057 error!("insert_all: {data:?}");
1058 array![]
1059 }
1060 }
1061 }
1062 fn update(&mut self, data: JsonValue) -> JsonValue {
1063 let mut values = vec![];
1064 for (field, value) in data.entries() {
1065 if value.is_string() {
1066 values.push(format!(
1067 "{}='{}'",
1068 field,
1069 value.to_string().replace("'", "''")
1070 ));
1071 } else if value.is_number() {
1072 values.push(format!("{field}= {value}"));
1073 } else if value.is_array() {
1074 if self.params.json[field].is_empty() {
1075 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1076 values.push(format!("{field}='{array}'"));
1077 } else {
1078 let json = value.to_string();
1079 let json = json.replace("'", "''");
1080 values.push(format!("{field}='{json}'"));
1081 }
1082 continue;
1083 } else if value.is_object() {
1084 if self.params.json[field].is_empty() {
1085 values.push(format!("{field}='{value}'"));
1086 } else {
1087 if value.is_empty() {
1088 values.push(format!("{field}=''"));
1089 continue;
1090 }
1091 let json = value.to_string();
1092 let json = json.replace("'", "''");
1093 values.push(format!("{field}='{json}'"));
1094 }
1095 continue;
1096 } else if value.is_boolean() {
1097 values.push(format!("{field}= {value}"));
1098 } else {
1099 values.push(format!("{field}=\"{value}\""));
1100 }
1101 }
1102
1103 for (field, value) in self.params.inc_dec.entries() {
1104 values.push(format!("{} = {}", field, value.to_string().clone()));
1105 }
1106
1107 let values = values.join(",");
1108
1109 let sql = format!(
1110 "UPDATE {} SET {} {};",
1111 self.params.table.clone(),
1112 values,
1113 self.params.where_sql()
1114 );
1115 if self.params.sql {
1116 return JsonValue::from(sql.clone());
1117 }
1118 let (state, data) = self.execute(sql.as_str());
1119 if state {
1120 data
1121 } else {
1122 let thread_id = format!("{:?}", thread::current().id());
1123 error!("update: {thread_id} {data:?} {sql}");
1124 0.into()
1125 }
1126 }
1127 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1128 let mut values = vec![];
1129
1130 let mut ids = vec![];
1131 for (field, _) in data[0].entries() {
1132 if field == "id" {
1133 continue;
1134 }
1135 let mut fields = vec![];
1136 for row in data.members() {
1137 let value = row[field].clone();
1138 let id = row["id"].clone();
1139 ids.push(id.clone());
1140 if value.is_string() {
1141 fields.push(format!(
1142 "WHEN '{}' THEN '{}'",
1143 id,
1144 value.to_string().replace("'", "''")
1145 ));
1146 } else if value.is_array() || value.is_object() {
1147 if self.params.json[field].is_empty() {
1148 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1149 } else {
1150 let json = value.to_string();
1151 let json = json.replace("'", "''");
1152 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1153 }
1154 continue;
1155 } else if value.is_number() || value.is_boolean() || value.is_null() {
1156 fields.push(format!("WHEN '{id}' THEN {value}"));
1157 } else {
1158 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1159 }
1160 }
1161 values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1162 }
1163 self.where_and("id", "in", ids.into());
1164 for (field, value) in self.params.inc_dec.entries() {
1165 values.push(format!("{} = {}", field, value.to_string().clone()));
1166 }
1167
1168 let values = values.join(",");
1169 let sql = format!(
1170 "UPDATE {} SET {} {} {};",
1171 self.params.table.clone(),
1172 values,
1173 self.params.where_sql(),
1174 self.params.page_limit_sql()
1175 );
1176 if self.params.sql {
1177 return JsonValue::from(sql.clone());
1178 }
1179 let (state, data) = self.execute(sql.as_str());
1180 if state {
1181 data
1182 } else {
1183 error!("update_all: {data:?}");
1184 JsonValue::from(0)
1185 }
1186 }
1187
1188 fn delete(&mut self) -> JsonValue {
1189 let sql = format!(
1190 "delete FROM {} {} {};",
1191 self.params.table.clone(),
1192 self.params.where_sql(),
1193 self.params.page_limit_sql()
1194 );
1195 if self.params.sql {
1196 return JsonValue::from(sql.clone());
1197 }
1198 let (state, data) = self.execute(sql.as_str());
1199 match state {
1200 true => data,
1201 false => {
1202 error!("delete 失败>>> {data:?}");
1203 JsonValue::from(0)
1204 }
1205 }
1206 }
1207
1208 fn transaction(&mut self) -> bool {
1209 let thread_id = format!("{:?}", thread::current().id());
1210
1211 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1212 let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1213 t += 1;
1214 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1215 return true;
1216 }
1217
1218 TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1219 let key = format!("{}{}", self.default, thread_id);
1220
1221 let mut guard = match self.client.get_guard() {
1223 Ok(g) => g,
1224 Err(e) => {
1225 error!("获取事务连接失败: {e}");
1226 return false;
1227 }
1228 };
1229
1230 let conn = guard.conn().clone();
1231
1232 drop(guard);
1234
1235 TR.lock().unwrap().insert(key.clone(), Arc::new(Mutex::new(conn)));
1236
1237 let sql = "START TRANSACTION;".to_string();
1238 let (state, _) = self.execute(sql.as_str());
1239 match state {
1240 true => {
1241 let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1242 let (state, _) = self.execute(sql.as_str());
1243 match state {
1244 true => state,
1245 false => {
1246 TRANS.lock().unwrap().remove(&*thread_id.clone());
1247 TR.lock().unwrap().remove(&key.clone());
1248 state
1249 }
1250 }
1251 }
1252 false => {
1253 TRANS.lock().unwrap().remove(&*thread_id.clone());
1254 TR.lock().unwrap().remove(&key.clone());
1255 state
1256 }
1257 }
1258 }
1259 fn commit(&mut self) -> bool {
1260 let thread_id = format!("{:?}", thread::current().id());
1261 let sql = "COMMIT".to_string();
1262
1263 let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1264 if t > 1 {
1265 t -= 1;
1266 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1267 return true;
1268 }
1269 let (state, data) = self.execute(sql.as_str());
1270 TRANS.lock().unwrap().remove(&thread_id);
1271 let key = format!("{}{}", self.default, thread_id);
1272 TR.lock().unwrap().remove(&*key);
1273 match state {
1274 true => {}
1275 false => {
1276 error!("提交事务失败: {data}");
1277 }
1278 }
1279 state
1280 }
1281
1282 fn rollback(&mut self) -> bool {
1283 let thread_id = format!("{:?}", thread::current().id());
1284 let sql = "ROLLBACK".to_string();
1285
1286 let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1287 if t > 1 {
1288 t -= 1;
1289 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1290 return true;
1291 }
1292 let (state, data) = self.execute(sql.as_str());
1293 TRANS.lock().unwrap().remove(&thread_id);
1294 let key = format!("{}{}", self.default, thread_id);
1295 TR.lock().unwrap().remove(&*key);
1296 match state {
1297 true => {}
1298 false => {
1299 error!("回滚失败: {data}");
1300 }
1301 }
1302 state
1303 }
1304
1305 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1306 let (state, data) = self.query(sql);
1307 match state {
1308 true => Ok(data),
1309 false => Err(data.to_string()),
1310 }
1311 }
1312
1313 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1314 let (state, data) = self.execute(sql);
1315 match state {
1316 true => Ok(data),
1317 false => Err(data.to_string()),
1318 }
1319 }
1320
1321 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1322 self.params.inc_dec[field] = format!("{field} + {num}").into();
1323 self
1324 }
1325
1326 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1327 self.params.inc_dec[field] = format!("{field} - {num}").into();
1328 self
1329 }
1330 fn buildsql(&mut self) -> String {
1331 self.fetch_sql();
1332 let sql = self.select().to_string();
1333 format!("( {} ) {}", sql, self.params.table)
1334 }
1335
1336 fn join(&mut self, table: &str, main_fields: &str, right_fields: &str) -> &mut Self {
1337 let main_table = if self.params.join_table.is_empty() {
1338 self.params.table.clone()
1339 } else {
1340 self.params.join_table.clone()
1341 };
1342 self.params.join_table = table.to_string();
1343 self.params.join.push(format!(" LEFT JOIN {table} ON {main_table}.{main_fields} = {table}.{right_fields}"));
1344 self
1345 }
1346
1347 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1348 let main_fields = if main_fields.is_empty() {
1349 "id"
1350 } else {
1351 main_fields
1352 };
1353 let second_fields = if second_fields.is_empty() {
1354 self.params.table.clone()
1355 } else {
1356 second_fields.to_string().clone()
1357 };
1358 let sec_table_name = format!("{}{}", table, "_2");
1359 let second_table = format!("{} {}", table, sec_table_name.clone());
1360 self.params.join_table = sec_table_name.clone();
1361 self.params.join.push(format!(
1362 " INNER JOIN {} ON {}.{} = {}.{}",
1363 second_table, self.params.table, main_fields, sec_table_name, second_fields
1364 ));
1365 self
1366 }
1367}