1use lazy_static::lazy_static;
12use std::collections::HashMap;
13use std::sync::Mutex;
14use tokio_postgres::Error;
15
16use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
17use once_cell::sync::Lazy;
18pub use pgmacro;
19pub use plugin::logic_delete::LogicDelete;
20pub use plugin::logic_delete::LogicDeletePlugin;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::fmt::Debug;
24use tokio::sync::OnceCell;
25pub use tokio_postgres;
26pub use tokio_postgres::{types::ToSql, Client, NoTls, Row};
27pub mod plugin;
28pub mod wrapper;
29pub use wrapper::Wrapper;
30pub mod unit;
31use tracing::{debug, error, info, trace};
32
33static EMPTY_STRING: &'static str = r#""#;
36pub const LOG_SPACE: &'static str =
37 " ";
38lazy_static! {
39 static ref EXCLUSION_TABLE: Mutex<Vec<String>> = {
40 Mutex::new(Vec::new())
41 };
43}
44
45static DATABASE_POOL: Lazy<Pgbatis> = Lazy::new(|| {
46 let mut m = Pgbatis::new();
47 let exclusion_table_m = EXCLUSION_TABLE.lock().unwrap();
48 let exclusion_table = exclusion_table_m.clone();
49 m.logic_plugin = Some(Box::new(LogicDeletePlugin::new_opt(
56 "is_deleted",
57 1,
58 0,
59 exclusion_table,
60 )));
61 m
63});
64
65pub trait Parameters {
67 fn get_table_name(prefix: Option<String>, suffix: Option<String>) -> String;
69 fn get_field_list() -> String;
70 fn gen_save(
72 &self,
73 prefix: Option<String>,
74 suffix: Option<String>,
75 ) -> Result<(String, Vec<&(dyn ToSql + Sync)>), String>;
76 fn gen_update(
78 &self,
79 prefix: Option<String>,
80 suffix: Option<String>,
81 ) -> Result<(String, Vec<&(dyn ToSql + Sync)>, u32), String>;
82 fn return_one(rows: Row) -> Self;
83 fn return_list(rows: Vec<Row>) -> Vec<Self>
84 where
85 Self: Sized,
86 {
87 let mut result_list = Vec::new();
88 for row in rows {
89 result_list.push(Self::return_one(row))
90 }
91
92 return result_list;
93 }
94}
95
96pub trait ColumnExt {
97 fn get(&self) -> &'static str;
98}
99
100pub struct Pgbatis {
103 pub pools: OnceCell<Pool>,
104 pub logic_plugin: Option<Box<dyn LogicDelete>>,
106}
107
108impl Pgbatis {
109 pub fn new() -> Self {
110 return Self::new_with_opt();
111 }
112
113 pub fn new_with_opt() -> Self {
115 return Self {
116 pools: OnceCell::new(),
117 logic_plugin: None,
119 };
120 }
121
122 pub fn set_logic_delete(&mut self, arg: Option<impl LogicDelete + 'static>) {
123 match arg {
124 Some(v) => {
125 self.logic_plugin = Some(Box::new(v));
126 }
127 None => {
128 self.logic_plugin = None;
129 }
130 }
131 }
133}
134
135fn trace_exec_log(sql: &str, args: &[&(dyn ToSql + Sync)]) {
136 let format = format!(
137 "Exec ==> {}\n{}[pgbatis] [{}] Args ==> {:?}",
138 &sql, LOG_SPACE, "", args
139 );
140 trace!("{}", format);
141}
142
143pub fn set_unlogic_delete_table(table_name: &str) {
146 let mut exclusion_table_m = EXCLUSION_TABLE.lock().unwrap();
147 exclusion_table_m.push(table_name.to_string());
148}
149
150pub async fn link(
151 host: &str,
152 port: u16,
153 user: &str,
154 password: &str,
155 dbname: &str,
156 max_size: usize,
157) -> Result<(), String> {
158 let mut pg_config = tokio_postgres::Config::new();
159 pg_config.host(host);
161 pg_config.port(port);
162 pg_config.user(user);
163 pg_config.password(password);
164 pg_config.dbname(dbname);
165 let mgr_config = ManagerConfig {
166 recycling_method: RecyclingMethod::Verified,
167 };
168
169 let mgr = Manager::from_config(pg_config, NoTls, mgr_config);
170 let pool = Pool::new(mgr, max_size);
172 let _ = DATABASE_POOL.pools.set(pool);
173
174 let driver_url = format!("postgresql://{}:{}@{}/{}", user, password, host, dbname);
175 info!("数据库连接:{}", driver_url);
176 return Ok(());
181}
182
183#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
184pub struct PageT<T>
185where
186 T: Parameters,
187{
188 pub records: Vec<T>,
190 pub total: u64,
192 pub pages: u64,
194 pub page_no: u64,
196 pub page_size: u64,
198}
199#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
200pub struct PageHash {
201 pub records: Vec<HashMap<String, Value>>,
203 pub total: u64,
205 pub pages: u64,
207 pub page_no: u64,
209 pub page_size: u64,
211}
212
213pub async fn execute(sql: &String, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error> {
214 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
215 let client = pools.get().await.unwrap();
216 let statement = client.prepare(sql.as_str()).await.unwrap();
217 let result = client.execute(&statement, ¶ms).await;
218 match result {
219 Ok(t) => {
220 trace!("execute 执行成功");
221 return Ok(t);
222 }
223 Err(e) => {
224 error!("{:?}", e);
226 return Err(e);
227 }
228 }
229}
230
231pub async fn save<T>(p: T, prefix: Option<String>, suffix: Option<String>) -> Result<u64, Error>
235where
236 T: Parameters,
237{
238 let (sql, args) = p.gen_save(prefix, suffix).unwrap();
239 trace_exec_log(&sql, &args);
241 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
243 let client = pools.get().await.unwrap();
244 let statement = client.prepare(sql.as_str()).await.unwrap();
245 let result = client.execute(&statement, &args).await;
246 match result {
247 Ok(t) => {
248 trace!("SAVE 执行成功");
249 return Ok(t);
250 }
251 Err(e) => {
252 error!("{:?}", e);
254 return Err(e);
255 }
256 }
257}
258pub async fn update<T>(
262 p: T,
263 wrapper: Wrapper<'_>,
264 prefix: Option<String>,
265 suffix: Option<String>,
266) -> Result<u64, Error>
267where
268 T: Parameters,
269{
270 let (mut sql, mut args, args_number) = p.gen_update(prefix.clone(), suffix.clone()).unwrap();
272
273 let table_name = T::get_table_name(prefix, suffix);
275 let logic_plugin_sql = get_logic_undelete(&table_name);
276
277 let (where_sql, mut where_args) = wrapper.build(args_number).unwrap();
278 if !where_sql.is_empty() && !logic_plugin_sql.is_empty() {
279 sql = format!("{} WHERE {} AND {}", sql, where_sql, logic_plugin_sql);
280 } else if !where_sql.is_empty() && logic_plugin_sql.is_empty() {
281 sql = format!("{} WHERE {} ", sql, where_sql);
282 } else if where_sql.is_empty() && !logic_plugin_sql.is_empty() {
283 sql = format!("{} WHERE {} ", sql, logic_plugin_sql);
284 }
285 args.append(&mut where_args);
286 trace_exec_log(&sql, &args);
288 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
290 let client = pools.get().await.unwrap();
291 let statement = client.prepare(sql.as_str()).await.unwrap();
292 let result = client.execute(&statement, &args).await;
293
294 match result {
295 Ok(t) => {
296 return Ok(t);
298 }
299 Err(e) => {
300 error!("{:?}", e);
302 return Err(e);
303 }
304 }
305}
306
307pub async fn remove<P>(
309 wrapper: Wrapper<'_>,
310 prefix: Option<String>,
311 suffix: Option<String>,
312) -> Result<u64, Error>
313where
314 P: Parameters,
315{
316 let table_name = P::get_table_name(prefix, suffix);
318 let mut sql = format!("DELETE FROM \"{}\"", &table_name);
319
320 if DATABASE_POOL.logic_plugin.is_some() {
323 if !DATABASE_POOL
325 .logic_plugin
326 .as_ref()
327 .unwrap()
328 .is_exclusion_table(table_name.as_str())
329 {
330 sql = format!(
331 "UPDATE \"{}\" set \"{}\" = {} ",
332 table_name,
333 DATABASE_POOL.logic_plugin.as_ref().unwrap().column(),
334 DATABASE_POOL.logic_plugin.as_ref().unwrap().deleted(),
335 );
336 }
337 }
338 let (where_sql, args) = wrapper.build(1).unwrap();
339 if !where_sql.is_empty() {
340 sql = format!("{} where {}", sql, where_sql);
341 }
342 trace_exec_log(&sql, &args);
344
345 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
347 let client = pools.get().await.unwrap();
348 let statement = client.prepare(sql.as_str()).await.unwrap();
349 let result = client.execute(&statement, &args).await;
350 match result {
351 Ok(t) => {
352 return Ok(t);
354 }
355 Err(e) => {
356 error!("{:?}", e);
358 return Err(e);
359 }
360 }
361}
362
363pub async fn fetch<T>(
389 wrapper: Wrapper<'_>,
390 prefix: Option<String>,
391 suffix: Option<String>,
392) -> Result<Vec<T>, Error>
393where
394 T: Parameters,
395{
396 let field_name = wrapper.clone().get_recoder_field::<T>();
397 let table_name = T::get_table_name(prefix, suffix);
398 let (where_sql, args) = wrapper.clone().build(1).unwrap();
401 let logic_plugin_sql = get_logic_undelete(&table_name);
403
404 let mut sql = String::new();
405 sql.push_str("SELECT ");
406 sql.push_str(field_name.as_str());
407 sql.push_str(" FROM ");
408 sql.push_str(table_name.as_str());
409
410 let mut had_where = false;
411 if !where_sql.is_empty() {
417 sql.push_str(" WHERE ");
418 sql.push_str(where_sql.as_str());
419 had_where = true;
420 }
421
422 if !logic_plugin_sql.is_empty() {
423 if had_where {
424 sql.push_str(" AND ");
425 } else {
426 sql.push_str(" WHERE ");
427 }
428 sql.push_str(logic_plugin_sql.as_str());
429 }
430
431 let order_by_format = match wrapper.get_order_by() {
435 Some(t) => t,
436 None => " ".to_string(),
437 };
438 sql.push_str(&order_by_format);
439
440 trace_exec_log(&sql, &args);
442 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
443 let client = pools.get().await.unwrap();
444 let statement = client.prepare(sql.as_str()).await.unwrap();
445 let result = client.query(&statement, &args).await?;
446 let result_t = T::return_list(result);
449
450 return Ok(result_t);
451}
452
453pub async fn fetch_with_recoder_field<T>(
454 wrapper: Wrapper<'_>,
455 prefix: Option<String>,
456 suffix: Option<String>,
457) -> Result<Vec<HashMap<String, Value>>, Error>
458where
459 T: Parameters,
460{
461 let field_name = wrapper.clone().get_recoder_field::<T>();
462 let table_name = T::get_table_name(prefix, suffix);
463 let (where_sql, args) = wrapper.clone().build(1).unwrap();
466 let logic_plugin_sql = get_logic_undelete(&table_name);
468
469 let mut sql = String::new();
470 sql.push_str("SELECT ");
471 sql.push_str(field_name.as_str());
472 sql.push_str(" FROM ");
473 sql.push_str(table_name.as_str());
474
475 let mut had_where = false;
476 if !where_sql.is_empty() {
482 sql.push_str(" WHERE ");
483 sql.push_str(where_sql.as_str());
484 had_where = true;
485 }
486
487 if !logic_plugin_sql.is_empty() {
488 if had_where {
489 sql.push_str(" AND ");
490 } else {
491 sql.push_str(" WHERE ");
492 }
493 sql.push_str(logic_plugin_sql.as_str());
494 }
495
496 let order_by_format = match wrapper.get_order_by() {
500 Some(t) => t,
501 None => " ".to_string(),
502 };
503 sql.push_str(&order_by_format);
504
505 trace_exec_log(&sql, &args);
507 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
508 let client = pools.get().await.unwrap();
509 let statement = client.prepare(sql.as_str()).await.unwrap();
510 let result = client.query(&statement, &args).await?;
511 let mut new_row_hashmap = Vec::new();
515
516 for row in result {
517 new_row_hashmap.push(unit::pg_value_to_json_value(&row));
518 }
519
520 return Ok(new_row_hashmap);
521}
522
523pub async fn fetch_one<T>(
525 wrapper: Wrapper<'_>,
526 prefix: Option<String>,
527 suffix: Option<String>,
528) -> Result<T, Error>
529where
530 T: Parameters,
531{
532 let field_name = wrapper.clone().get_recoder_field::<T>();
534 let table_name = T::get_table_name(prefix, suffix);
535
536 let (where_sql, args) = wrapper.build(1).unwrap();
537
538 let logic_plugin_sql = get_logic_undelete(&table_name);
540
541 let mut sql = String::new();
542 sql.push_str("SELECT ");
543 sql.push_str(field_name.as_str());
544 sql.push_str(" FROM ");
545 sql.push_str(table_name.as_str());
546
547 let mut had_where = false;
548 if !where_sql.is_empty() {
554 sql.push_str(" WHERE ");
555 sql.push_str(where_sql.as_str());
556 had_where = true;
557 }
558
559 if !logic_plugin_sql.is_empty() {
560 if had_where {
561 sql.push_str(" AND ");
562 } else {
563 sql.push_str(" WHERE ");
564 }
565 sql.push_str(logic_plugin_sql.as_str());
566 }
567 trace_exec_log(&sql, &args);
569 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
570 let client = pools.get().await.unwrap();
571 if client.is_closed() {}
572 let statement = client.prepare(sql.as_str()).await.unwrap();
573 let result = client.query_one(&statement, &args).await;
574 match result {
576 Ok(row) => {
577 let result_t = T::return_one(row);
578 return Ok(result_t);
579 }
580 Err(e) => {
581 return Err(e);
583 }
584 }
585}
586
587pub async fn fetch_page<T>(
589 wrapper: Wrapper<'_>,
590 prefix: Option<String>,
591 suffix: Option<String>,
592) -> Result<PageT<T>, Error>
593where
594 T: Parameters,
595{
596 let (_limit_str, _limit, page_no, page_size) = wrapper.clone().get_page_info();
599
600 let recoder_field = wrapper.clone().get_recoder_field::<T>();
601 let table_name = T::get_table_name(prefix, suffix);
603
604 let (where_sql, args) = wrapper.clone().build(1).unwrap();
605
606 let logic_plugin_sql = get_logic_undelete(&table_name);
608
609 let mut sql = String::new();
610 sql.push_str("SELECT ");
611 sql.push_str(recoder_field.as_str());
612 sql.push_str(" FROM ");
613 sql.push_str(table_name.as_str());
614
615 let mut had_where = false;
616 if !where_sql.is_empty() {
622 sql.push_str(" WHERE ");
623 sql.push_str(where_sql.as_str());
624 had_where = true;
625 }
626
627 if !logic_plugin_sql.is_empty() {
629 if had_where {
630 sql.push_str(" AND ");
631 } else {
632 sql.push_str(" WHERE ");
633 }
634 sql.push_str(logic_plugin_sql.as_str());
635 }
636
637 let re_query_page_total = query_page_total(&sql, &args).await;
638 debug!("re_query_page_total: {:?}", re_query_page_total);
639 let total = match re_query_page_total {
640 Ok(total) => total,
641 Err(err_str) => return Err(err_str),
642 };
643 if total == 0 {
644 let result = PageT {
645 records: Vec::new(),
647 total: 0,
649 pages: 0,
651 page_no,
653 page_size,
655 };
656
657 return Ok(result);
658 }
659
660 match wrapper.clone().get_order_by() {
662 Some(order_by) => {
663 sql.push_str(order_by.as_str());
664 }
665 None => (),
666 };
667
668 let (limit_str, _limit, page_no, page_size) = wrapper.clone().get_page_info();
670 let mut pages = total / page_size;
671 let duoyu = total % page_size;
672 if duoyu > 0 {
673 pages = pages + 1
674 }
675
676 sql.push_str(limit_str.as_str());
677
678 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
679 let client = pools.get().await.unwrap();
680 let statement = client.prepare(sql.as_str()).await.unwrap();
681 let result = client.query(&statement, &args).await;
682
683 let result = match result {
684 Ok(t) => t,
685 Err(e) => {
686 return Err(e);
688 }
689 };
690
691 let records = T::return_list(result);
692
693 let result = PageT {
694 records,
696 total,
698 pages,
700 page_no,
702 page_size,
704 };
705
706 return Ok(result);
707}
708
709pub async fn query(
712 sql: &str,
713 params: &[&(dyn ToSql + Sync)],
714) -> Result<Vec<HashMap<String, Value>>, Error> {
715 trace_exec_log(sql, ¶ms);
717 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
718 let client = pools.get().await.unwrap();
719 let statement = client.prepare(&sql).await.unwrap();
720 let opt_result = client.query(&statement, params).await;
721 let mut new_row_hashmap = Vec::new();
722 match opt_result {
723 Ok(result) => {
724 for row in result {
725 new_row_hashmap.push(unit::pg_value_to_json_value(&row));
726 }
727 }
728 Err(e) => {
729 return Err(e);
730 }
731 }
732 return Ok(new_row_hashmap);
733}
734
735pub async fn query_t<T>(sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<T>, Error>
737where
738 T: Parameters,
739{
740 trace_exec_log(&sql, ¶ms);
742 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
743 let client = pools.get().await.unwrap();
744 let statement = client.prepare(&sql).await.unwrap();
745 let result = client.query(&statement, params).await;
746
747 let result = match result {
748 Ok(t) => t,
749 Err(e) => {
750 return Err(e);
752 }
753 };
754 let records = T::return_list(result);
755
756 return Ok(records);
757}
758
759pub async fn query_one(
760 sql: &str,
761 params: &[&(dyn ToSql + Sync)],
762) -> Result<HashMap<String, Value>, String> {
763 trace_exec_log(&sql, ¶ms);
764 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
765 let client = pools.get().await.unwrap();
766 let statement = client.prepare(&sql).await.unwrap();
767 let opt_result = client.query_one(&statement, params).await;
768 match opt_result {
770 Ok(result) => {
771 return Ok(unit::pg_value_to_json_value(&result));
772 }
773 Err(e) => {
774 let err_str = format!("{}", e);
775 return Err(err_str);
776 }
777 }
778}
779
780async fn query_page_total(sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error> {
783 let mut opt_index = sql.find(" from ");
784 debug!("opt_index1: {:?}",opt_index);
785 opt_index = if opt_index.is_none() {
786 sql.find(" FROM ")
787 }else{
788 opt_index
789 };
790 let index = opt_index.take().unwrap();
791
792
793 let sql_from = &sql[index..sql.len()];
795 let total_sql = format!("select count(*) as total {}", sql_from);
796 trace_exec_log(&sql, params);
798 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
800 let client = pools.get().await.unwrap();
801 let statement = client.prepare(&total_sql).await.unwrap();
802 let opt_result = client.query_one(&statement, params).await;
803 match opt_result {
804 Ok(result) => {
805 let total: i64 = result.get("total");
806 return Ok(total as u64);
807 }
808 Err(e) => {
809 return Err(e);
811 }
812 }
813}
814
815pub async fn query_page(
816 sql: &str,
817 params: &[&(dyn ToSql + Sync)],
818 order_by: &str,
819 desc: bool,
820 page_no: u64,
821 page_size: u64,
822) -> Result<PageHash, Error> {
823 let re_query_page_total = query_page_total(&sql, params).await;
827
828 let total = match re_query_page_total {
829 Ok(total) => total,
830 Err(err_str) => return Err(err_str),
831 };
832
833 let pages = total / page_size;
835
836 let mut new_row_hashmap = Vec::new();
837 if total == 0 {
839 let page_data = PageHash {
840 records: new_row_hashmap,
841 total: total,
842 pages,
843 page_no,
844 page_size,
845 };
846 return Ok(page_data);
847 }
848
849 let b_order_desc = if desc { "DESC" } else { "ASC" };
851 let offset = (page_no - 1) * page_size;
854
855 let row_sql = format!(
856 "{} ORDER BY {} {} LIMIT {} OFFSET {};",
857 sql, order_by, b_order_desc, page_size, offset
858 );
859 trace_exec_log(&row_sql, params);
860 trace_exec_log(&sql, ¶ms);
861 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
862 let client = pools.get().await.unwrap();
863 let statement = client.prepare(&row_sql).await.unwrap();
864 let opt_result = client.query(&statement, params).await;
865
866 match opt_result {
867 Ok(result) => {
868 for row in result {
869 let hashmap = unit::pg_value_to_json_value(&row);
870 new_row_hashmap.push(hashmap);
871 }
872 }
873 Err(e) => {
874 return Err(e);
876 }
877 }
878
879 let page_data = PageHash {
880 records: new_row_hashmap,
881 total: total,
882 pages,
883 page_no,
884 page_size,
885 };
886 Ok(page_data)
888}
889
890pub async fn check_row_by_column<C, T>(
891 table_name: &str,
892 column: &C,
893 value: T,
894) -> Result<bool, Error>
895where
896 C: ColumnExt,
897 T: ToSql + Sync,
898{
899 let column_name = column.get();
900 let mut sql = format!(
901 "SELECT \"{}\" FROM \"{}\" WHERE \"{}\" = $1",
902 column_name, table_name, column_name
903 );
904
905 let logic_plugin_sql = get_logic_undelete(&table_name);
907
908 if !logic_plugin_sql.is_empty() {
909 sql.push_str(" AND ");
910 sql.push_str(logic_plugin_sql.as_str());
911 }
912 debug!("check_row_by_column sql :{}", sql);
913 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
914 let client = pools.get().await.unwrap();
915 let result = client.query(sql.as_str(), &[&value]).await;
928 match result {
929 Ok(opt_row) => {
930 if opt_row.len() > 0 {
931 return Ok(true);
932 } else {
933 return Ok(false);
934 }
935 }
936 Err(e) => {
937 error!("check_row_by_column:{}", e);
939 return Err(e);
940 }
941 }
942}
943
944pub async fn check_row_wrap<T>(
945 wrapper: Wrapper<'_>,
946 prefix: Option<String>,
947 suffix: Option<String>,
948) -> Result<bool, Error>
949where
950 T: Parameters,
951{
952 let table_name = T::get_table_name(prefix, suffix);
953 let (where_sql, args) = wrapper.clone().build(1).unwrap();
954
955 let logic_plugin_sql = get_logic_undelete(&table_name);
957
958 let mut sql = String::new();
959 sql.push_str("SELECT ");
960 sql.push_str(" 1 ");
961 sql.push_str(" FROM ");
962 sql.push_str(table_name.as_str());
963
964 let mut had_where = false;
965 if args.len() != 0 {
966 sql.push_str(" WHERE ");
967 sql.push_str(where_sql.as_str());
968 had_where = true;
969 }
970
971 if !logic_plugin_sql.is_empty() {
973 if had_where {
974 sql.push_str(" AND ");
975 } else {
976 sql.push_str(" WHERE ");
977 }
978 sql.push_str(logic_plugin_sql.as_str());
979 }
980 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
981 let client = pools.get().await.unwrap();
982 let statement = client.prepare(sql.as_str()).await.unwrap();
983 let result = client.query(&statement, &args).await;
984 match result {
985 Ok(opt_row) => {
986 if opt_row.len() == 0 {
987 return Ok(false);
988 } else {
989 return Ok(true);
990 }
991 }
994 Err(e) => {
995 return Err(e);
997 }
998 }
999}
1000
1001fn get_logic_undelete(table_name: &str) -> String {
1002 let logic_plugin_sql = if DATABASE_POOL.logic_plugin.is_some() {
1003 if !DATABASE_POOL
1005 .logic_plugin
1006 .as_ref()
1007 .unwrap()
1008 .is_exclusion_table(table_name)
1009 {
1010 format!(
1011 " \"{}\" = {} ",
1012 DATABASE_POOL.logic_plugin.as_ref().unwrap().column(),
1013 DATABASE_POOL.logic_plugin.as_ref().unwrap().un_deleted(),
1014 )
1015 } else {
1016 EMPTY_STRING.to_string()
1017 }
1018 } else {
1019 EMPTY_STRING.to_string()
1020 };
1021 logic_plugin_sql
1022}
1023pub fn format_like(obj: &Option<String>) -> Option<String> {
1025 match obj {
1026 Some(t) => {
1027 let two = format!("%{}%", t);
1028 Some(two)
1029 }
1030 None => None,
1031 }
1032}
1033pub fn format_like_left(obj: &Option<String>) -> Option<String> {
1035 match obj {
1036 Some(t) => {
1037 let two = format!("%{}", t);
1038 Some(two)
1039 }
1040 None => None,
1041 }
1042}
1043pub fn format_like_right(obj: &Option<String>) -> Option<String> {
1045 match obj {
1046 Some(t) => {
1047 let two = format!("{}%", t);
1048 Some(two)
1049 }
1050 None => None,
1051 }
1052}
1053
1054pub async fn transaction_delete_and_save<T, V>(
1056 delete_column: &'static str,
1057 delete_value: &V,
1058 p: Vec<T>,
1059 prefix: Option<String>,
1060 suffix: Option<String>,
1061) -> Result<u64, Error>
1062where
1063 T: Parameters,
1064 V: ToSql + Sync,
1065{
1066 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
1067 let mut client = pools.get().await.unwrap();
1068 let transaction = client.transaction().await.unwrap();
1069 let table_name = T::get_table_name(prefix.clone(), suffix.clone());
1071 let sql = format!(
1072 "DELETE FROM \"{}\" where \"{}\" = $1",
1073 &table_name, delete_column
1074 );
1075 let statement = transaction.prepare(sql.as_str()).await.unwrap();
1077 match transaction.execute(&statement, &[delete_value]).await {
1078 Ok(_) => {
1079 debug!("删除执行成功");
1080 }
1081 Err(e) => {
1082 error!("删除执行失败: {}", e);
1083 transaction.rollback().await.unwrap();
1084 return Err(e);
1085 }
1086 }
1087
1088 for item in p.iter() {
1089 let (sql, params) = item.gen_save(prefix.clone(), suffix.clone()).unwrap();
1090 let statement = transaction.prepare(sql.as_str()).await.unwrap();
1091 let result = transaction.execute(&statement, ¶ms).await;
1092 match result {
1093 Ok(_t) => {
1094 trace!("SAVE 执行成功");
1095 }
1097 Err(e) => {
1098 transaction.rollback().await.unwrap();
1099 error!("{:?}", e);
1100 return Err(e);
1101 }
1102 }
1103 }
1104 let result = transaction.commit().await;
1105
1106 if result.is_err() {
1107 error!(" transaction.commit() 执行 提交失败!");
1108 }
1110 return Ok(0);
1111}
1112
1113#[derive(Clone, Debug)]
1114pub struct TransactionBatchParam<'a> {
1115 pub statement_sql: &'a str,
1116 pub params: Vec<&'a (dyn ToSql + Sync)>,
1117}
1118
1119pub async fn transaction_batch(param: Vec<TransactionBatchParam<'_>>) -> Result<u64, String> {
1121 let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
1122 let mut client = pools.get().await.unwrap();
1123 let transaction = client.transaction().await.unwrap();
1124 for item in param.iter() {
1126 let statement = transaction.prepare(item.statement_sql).await.unwrap();
1127 match transaction.execute(&statement, &item.params).await {
1128 Ok(t) => {
1129 if t == 0 {
1131 if !item.statement_sql.find("DELETE").is_some(){
1132 transaction.rollback().await.unwrap();
1133 error!("执行的SQL没有任何意义,响应行数为0");
1134 return Err("执行的SQL没有任何意义,响应行数为0".to_string());
1135 }
1136 }
1137 }
1138 Err(e) => {
1139 transaction.rollback().await.unwrap();
1141 error!("{:?}", e);
1142 return Err(e.to_string());
1143 }
1144 }
1145 }
1146 let result = transaction.commit().await;
1147
1148 if result.is_err() {
1149 error!(" transaction.commit() 执行 提交失败!");
1150 }
1152
1153 return Ok(0);
1154}