pgbatis/
lib.rs

1/*
2 * @Author: venom
3 * @Date: 2021-08-13 16:42:05
4 * @LastEditors: BuddyCoder
5 * @LastEditTime: 2024-05-23 07:55:41
6 * @Description:
7 * @FilePath: /pgbatis/src/lib.rs
8 * MIT
9 */
10
11use lazy_static::lazy_static;
12use std::collections::HashMap;
13use std::sync::Mutex;
14use tokio_postgres::Error;
15
16use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
17use once_cell::sync::Lazy;
18pub use pgmacro;
19pub use plugin::logic_delete::LogicDelete;
20pub use plugin::logic_delete::LogicDeletePlugin;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::fmt::Debug;
24use tokio::sync::OnceCell;
25pub use tokio_postgres;
26pub use tokio_postgres::{types::ToSql, Client, NoTls, Row};
27pub mod plugin;
28pub mod wrapper;
29pub use wrapper::Wrapper;
30pub mod unit;
31use tracing::{debug, error, info, trace};
32
33//迫不得已,当连接断开之后 无法重新连接。只能启用连接一次 重连一次的做法。每次都生成独立的连接
34//性能必定受损,罢了罢了。虽然 deadpool 是一个不错的连接池
35static EMPTY_STRING: &'static str = r#""#;
36pub const LOG_SPACE: &'static str =
37    "                                                                ";
38lazy_static! {
39    static  ref EXCLUSION_TABLE: Mutex<Vec<String>>  = {
40        Mutex::new(Vec::new())
41      //  Arc::new(m)
42    };
43}
44
45static DATABASE_POOL: Lazy<Pgbatis> = Lazy::new(|| {
46    let mut m = Pgbatis::new();
47    let exclusion_table_m = EXCLUSION_TABLE.lock().unwrap();
48    let exclusion_table = exclusion_table_m.clone();
49    // let mut exclusion_table = Vec::new();
50    // for exclusion in exclusion_table_str {
51    //     let table = String::from(exclusion);
52    //     exclusion_table.push(table);
53    // }
54
55    m.logic_plugin = Some(Box::new(LogicDeletePlugin::new_opt(
56        "is_deleted",
57        1,
58        0,
59        exclusion_table,
60    )));
61    // let mut exclusion_table = Vec::new();
62    m
63});
64
65//定义参数的特性
66pub trait Parameters {
67    // fn get_table_name(prefix: &str,suffix:&str) -> String;
68    fn get_table_name(prefix: Option<String>, suffix: Option<String>) -> String;
69    fn get_field_list() -> String;
70    //生成保存的字符串
71    fn gen_save(
72        &self,
73        prefix: Option<String>,
74        suffix: Option<String>,
75    ) -> Result<(String, Vec<&(dyn ToSql + Sync)>), String>;
76    //生成修改更新的字符串
77    fn gen_update(
78        &self,
79        prefix: Option<String>,
80        suffix: Option<String>,
81    ) -> Result<(String, Vec<&(dyn ToSql + Sync)>, u32), String>;
82    fn return_one(rows: Row) -> Self;
83    fn return_list(rows: Vec<Row>) -> Vec<Self>
84    where
85        Self: Sized,
86    {
87        let mut result_list = Vec::new();
88        for row in rows {
89            result_list.push(Self::return_one(row))
90        }
91
92        return result_list;
93    }
94}
95
96pub trait ColumnExt {
97    fn get(&self) -> &'static str;
98}
99
100//声明一个tokio_postgres的连接管理类
101
102pub struct Pgbatis {
103    pub pools: OnceCell<Pool>,
104    //pub client:HashMap<i32,Client>,
105    pub logic_plugin: Option<Box<dyn LogicDelete>>,
106}
107
108impl Pgbatis {
109    pub fn new() -> Self {
110        return Self::new_with_opt();
111    }
112
113    ///new Rbatis from Option
114    pub fn new_with_opt() -> Self {
115        return Self {
116            pools: OnceCell::new(),
117            //逻辑删除插件
118            logic_plugin: None,
119        };
120    }
121
122    pub fn set_logic_delete(&mut self, arg: Option<impl LogicDelete + 'static>) {
123        match arg {
124            Some(v) => {
125                self.logic_plugin = Some(Box::new(v));
126            }
127            None => {
128                self.logic_plugin = None;
129            }
130        }
131        //   self.logic_plugin = Some(arg)
132    }
133}
134
135fn trace_exec_log(sql: &str, args: &[&(dyn ToSql + Sync)]) {
136    let format = format!(
137        "Exec   ==> {}\n{}[pgbatis] [{}] Args   ==> {:?}",
138        &sql, LOG_SPACE, "", args
139    );
140    trace!("{}", format);
141}
142
143//添加
144///设置无逻辑删除的表
145pub fn set_unlogic_delete_table(table_name: &str) {
146    let mut exclusion_table_m = EXCLUSION_TABLE.lock().unwrap();
147    exclusion_table_m.push(table_name.to_string());
148}
149
150pub async fn link(
151    host: &str,
152    port: u16,
153    user: &str,
154    password: &str,
155    dbname: &str,
156    max_size: usize,
157) -> Result<(), String> {
158    let mut pg_config = tokio_postgres::Config::new();
159    //   pg_config.options(&driver_url);
160    pg_config.host(host);
161    pg_config.port(port);
162    pg_config.user(user);
163    pg_config.password(password);
164    pg_config.dbname(dbname);
165    let mgr_config = ManagerConfig {
166        recycling_method: RecyclingMethod::Verified,
167    };
168
169    let mgr = Manager::from_config(pg_config, NoTls, mgr_config);
170    //创建连接池数量
171    let pool = Pool::new(mgr, max_size);
172    let _ = DATABASE_POOL.pools.set(pool);
173
174    let driver_url = format!("postgresql://{}:{}@{}/{}", user, password, host, dbname);
175    info!("数据库连接:{}", driver_url);
176    //trace_exec_log(&driver_url, args);
177
178    //DATABASE_POOL.logic_plugin.as_ref().unwrap().insert_exclusion_table("hellow");
179
180    return Ok(());
181}
182
183#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
184pub struct PageT<T>
185where
186    T: Parameters,
187{
188    /// data
189    pub records: Vec<T>,
190    /// total num
191    pub total: u64,
192    /// pages
193    pub pages: u64,
194    /// current page index
195    pub page_no: u64,
196    /// default 10
197    pub page_size: u64,
198}
199#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
200pub struct PageHash {
201    /// data
202    pub records: Vec<HashMap<String, Value>>,
203    /// total num
204    pub total: u64,
205    /// pages
206    pub pages: u64,
207    /// current page index
208    pub page_no: u64,
209    /// default 10
210    pub page_size: u64,
211}
212
213pub async fn execute(sql: &String, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error> {
214    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
215    let client = pools.get().await.unwrap();
216    let statement = client.prepare(sql.as_str()).await.unwrap();
217    let result = client.execute(&statement, &params).await;
218    match result {
219        Ok(t) => {
220            trace!("execute 执行成功");
221            return Ok(t);
222        }
223        Err(e) => {
224            // let err_str = format!("e:{}", e);
225            error!("{:?}", e);
226            return Err(e);
227        }
228    }
229}
230
231//如果表名需要前缀
232//pub async fn pix_save<T>(p: T,prefix:&str) -> Result<u64, String>
233/// 执行保存操作,执行insert操作
234pub async fn save<T>(p: T, prefix: Option<String>, suffix: Option<String>) -> Result<u64, Error>
235where
236    T: Parameters,
237{
238    let (sql, args) = p.gen_save(prefix, suffix).unwrap();
239    //跟踪日志
240    trace_exec_log(&sql, &args);
241    //取出  client
242    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
243    let client = pools.get().await.unwrap();
244    let statement = client.prepare(sql.as_str()).await.unwrap();
245    let result = client.execute(&statement, &args).await;
246    match result {
247        Ok(t) => {
248            trace!("SAVE 执行成功");
249            return Ok(t);
250        }
251        Err(e) => {
252            //let err_str = format!("e:{}", e);
253            error!("{:?}", e);
254            return Err(e);
255        }
256    }
257}
258//如果表名需要前缀
259//pub async fn update<T>(p: T, wrapper: Wrapper<'_>,prefix:&str) -> Result<u64, String>
260/// 执行更新操作,执行update操作
261pub async fn update<T>(
262    p: T,
263    wrapper: Wrapper<'_>,
264    prefix: Option<String>,
265    suffix: Option<String>,
266) -> Result<u64, Error>
267where
268    T: Parameters,
269{
270    //p.set_prefix(prefix);
271    let (mut sql, mut args, args_number) = p.gen_update(prefix.clone(), suffix.clone()).unwrap();
272
273    //获取逻辑删除部分的SQL
274    let table_name = T::get_table_name(prefix, suffix);
275    let logic_plugin_sql = get_logic_undelete(&table_name);
276
277    let (where_sql, mut where_args) = wrapper.build(args_number).unwrap();
278    if !where_sql.is_empty() && !logic_plugin_sql.is_empty() {
279        sql = format!("{} WHERE {} AND {}", sql, where_sql, logic_plugin_sql);
280    } else if !where_sql.is_empty() && logic_plugin_sql.is_empty() {
281        sql = format!("{} WHERE {}  ", sql, where_sql);
282    } else if where_sql.is_empty() && !logic_plugin_sql.is_empty() {
283        sql = format!("{} WHERE {}  ", sql, logic_plugin_sql);
284    }
285    args.append(&mut where_args);
286    //跟踪日志
287    trace_exec_log(&sql, &args);
288    //取出  client
289    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
290    let client = pools.get().await.unwrap();
291    let statement = client.prepare(sql.as_str()).await.unwrap();
292    let result = client.execute(&statement, &args).await;
293
294    match result {
295        Ok(t) => {
296          //  info!("更新执行成功");
297            return Ok(t);
298        }
299        Err(e) => {
300            //  let err_str = format!("e:{}", e);
301            error!("{:?}", e);
302            return Err(e);
303        }
304    }
305}
306
307/// 通过唯一ID执行删除操作
308pub async fn remove<P>(
309    wrapper: Wrapper<'_>,
310    prefix: Option<String>,
311    suffix: Option<String>,
312) -> Result<u64, Error>
313where
314    P: Parameters,
315{
316    //正常的写法  DELETE FROM tableName    WHERE
317    let table_name = P::get_table_name(prefix, suffix);
318    let mut sql = format!("DELETE FROM \"{}\"", &table_name);
319
320    //判断是否有逻辑删除插件
321    //如果有,进入逻辑删除插件
322    if DATABASE_POOL.logic_plugin.is_some() {
323        // 判断表名是否存在排除表中, 如果不存在,sql变成逻辑删除的方式
324        if !DATABASE_POOL
325            .logic_plugin
326            .as_ref()
327            .unwrap()
328            .is_exclusion_table(table_name.as_str())
329        {
330            sql = format!(
331                "UPDATE \"{}\" set \"{}\" = {} ",
332                table_name,
333                DATABASE_POOL.logic_plugin.as_ref().unwrap().column(),
334                DATABASE_POOL.logic_plugin.as_ref().unwrap().deleted(),
335            );
336        }
337    }
338    let (where_sql, args) = wrapper.build(1).unwrap();
339    if !where_sql.is_empty() {
340        sql = format!("{} where {}", sql, where_sql);
341    }
342    //跟踪日志
343    trace_exec_log(&sql, &args);
344
345    //取出  client
346    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
347    let client = pools.get().await.unwrap();
348    let statement = client.prepare(sql.as_str()).await.unwrap();
349    let result = client.execute(&statement, &args).await;
350    match result {
351        Ok(t) => {
352           // info!("更新执行成功");
353            return Ok(t);
354        }
355        Err(e) => {
356            //  let err_str = format!("e:{}", e);
357            error!("{:?}", e);
358            return Err(e);
359        }
360    }
361}
362
363/// 通过一个字段执行删除操作
364// pub async fn remove_by_column<P,C,T >(column: C, value: T,prefix: Option<String>,suffix:Option<String>) -> Result<u64, Error>
365// where
366//     P: Parameters,
367//     C: ColumnExt,
368//     T: ToSql + Sync,
369// {
370//     //获取逻辑删除部分的SQL
371//     let table_name = P::get_table_name(prefix,suffix);
372//     let wrapper = Wrapper::new().eq(&column, &value);
373//     remove(&table_name, wrapper).await
374// }
375
376/// 1、需要定义一个 trait fetch_result{}  用于赋值返回值与 新增修改的宏 进行分离
377///   主要目的需要查询数据库数据回来进行计算的业务。定义返回值,单表使用比较多
378/// fetch<T>(wrapper) -> Result<Vec<T>, Error>   单表不翻页   完成
379/// fetch_one<T>(wrapper) ->Result<T, Error>         单表单条数据
380/// fetch_page<T>(wrapper) ->Result<PageT<>, Error>     单表翻页
381
382/// 2、返回值定义Vec<HashMap<String,Value>> ,好处就是不需要定义返回值的结构体,可以根据需求调整sql的返回值,直接rep.
383/// query(&sql,&[args])->Result<Vec<HashMap<String,Value>>, Error>  主要用于联合查询,统计查询
384/// query_one(&sql,&[args])->Result<HashMap<String,Value>,Error>    联合查询单条数据
385/// query_page(&[返回字段],SQL,&[args])->Result<PageHash<HashMap<String,Value>>,Error> 联合查询进行翻页 统计查询
386
387///单表不翻页
388pub async fn fetch<T>(
389    wrapper: Wrapper<'_>,
390    prefix: Option<String>,
391    suffix: Option<String>,
392) -> Result<Vec<T>, Error>
393where
394    T: Parameters,
395{
396    let field_name = wrapper.clone().get_recoder_field::<T>();
397    let table_name = T::get_table_name(prefix, suffix);
398    // let order_by = wrapper.order_by.clone();
399    //  let desc = wrapper.desc.clone();
400    let (where_sql, args) = wrapper.clone().build(1).unwrap();
401    //获取逻辑未删除部分的SQL
402    let logic_plugin_sql = get_logic_undelete(&table_name);
403
404    let mut sql = String::new();
405    sql.push_str("SELECT ");
406    sql.push_str(field_name.as_str());
407    sql.push_str(" FROM ");
408    sql.push_str(table_name.as_str());
409
410    let mut had_where = false;
411    // if args.len() != 0 {
412    //     sql.push_str(" WHERE ");
413    //     sql.push_str(where_sql.as_str());
414    //     had_where = true;
415    // }
416    if !where_sql.is_empty() {
417        sql.push_str(" WHERE ");
418        sql.push_str(where_sql.as_str());
419        had_where = true;
420    }
421
422    if !logic_plugin_sql.is_empty() {
423        if had_where {
424            sql.push_str(" AND ");
425        } else {
426            sql.push_str(" WHERE ");
427        }
428        sql.push_str(logic_plugin_sql.as_str());
429    }
430
431    // if order_by !== None{
432    //     let desc_str = if desc {"desc"} else {"asc"};
433    //     let order_by_format = format!("order by {} {} " ,order_by.clone(),desc_str);
434    let order_by_format = match wrapper.get_order_by() {
435        Some(t) => t,
436        None => " ".to_string(),
437    };
438    sql.push_str(&order_by_format);
439
440    //跟踪日志
441    trace_exec_log(&sql, &args);
442    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
443    let client = pools.get().await.unwrap();
444    let statement = client.prepare(sql.as_str()).await.unwrap();
445    let result = client.query(&statement, &args).await?;
446    //  println!("result:{:?}", result);
447
448    let result_t = T::return_list(result);
449
450    return Ok(result_t);
451}
452
453pub async fn fetch_with_recoder_field<T>(
454    wrapper: Wrapper<'_>,
455    prefix: Option<String>,
456    suffix: Option<String>,
457) -> Result<Vec<HashMap<String, Value>>, Error>
458where
459    T: Parameters,
460{
461    let field_name = wrapper.clone().get_recoder_field::<T>();
462    let table_name = T::get_table_name(prefix, suffix);
463    // let order_by = wrapper.order_by.clone();
464    //  let desc = wrapper.desc.clone();
465    let (where_sql, args) = wrapper.clone().build(1).unwrap();
466    //获取逻辑未删除部分的SQL
467    let logic_plugin_sql = get_logic_undelete(&table_name);
468
469    let mut sql = String::new();
470    sql.push_str("SELECT ");
471    sql.push_str(field_name.as_str());
472    sql.push_str(" FROM ");
473    sql.push_str(table_name.as_str());
474
475    let mut had_where = false;
476    // if args.len() != 0 {
477    //     sql.push_str(" WHERE ");
478    //     sql.push_str(where_sql.as_str());
479    //     had_where = true;
480    // }
481    if !where_sql.is_empty() {
482        sql.push_str(" WHERE ");
483        sql.push_str(where_sql.as_str());
484        had_where = true;
485    }
486
487    if !logic_plugin_sql.is_empty() {
488        if had_where {
489            sql.push_str(" AND ");
490        } else {
491            sql.push_str(" WHERE ");
492        }
493        sql.push_str(logic_plugin_sql.as_str());
494    }
495
496    // if order_by !== None{
497    //     let desc_str = if desc {"desc"} else {"asc"};
498    //     let order_by_format = format!("order by {} {} " ,order_by.clone(),desc_str);
499    let order_by_format = match wrapper.get_order_by() {
500        Some(t) => t,
501        None => " ".to_string(),
502    };
503    sql.push_str(&order_by_format);
504
505    //跟踪日志
506    trace_exec_log(&sql, &args);
507    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
508    let client = pools.get().await.unwrap();
509    let statement = client.prepare(sql.as_str()).await.unwrap();
510    let result = client.query(&statement, &args).await?;
511    //  println!("result:{:?}", result);
512
513    // let result_t = T::return_list(result);
514    let mut new_row_hashmap = Vec::new();
515
516    for row in result {
517        new_row_hashmap.push(unit::pg_value_to_json_value(&row));
518    }
519
520    return Ok(new_row_hashmap);
521}
522
523///fetch_one<T>(wrapper) ->Result<T, Error>         单表单条数据
524pub async fn fetch_one<T>(
525    wrapper: Wrapper<'_>,
526    prefix: Option<String>,
527    suffix: Option<String>,
528) -> Result<T, Error>
529where
530    T: Parameters,
531{
532    //let field_name = T::get_field_list();
533    let field_name = wrapper.clone().get_recoder_field::<T>();
534    let table_name = T::get_table_name(prefix, suffix);
535
536    let (where_sql, args) = wrapper.build(1).unwrap();
537
538    //获取逻辑未删除部分的SQL
539    let logic_plugin_sql = get_logic_undelete(&table_name);
540
541    let mut sql = String::new();
542    sql.push_str("SELECT ");
543    sql.push_str(field_name.as_str());
544    sql.push_str(" FROM ");
545    sql.push_str(table_name.as_str());
546
547    let mut had_where = false;
548    // if args.len() != 0 {
549    //     sql.push_str(" WHERE ");
550    //     sql.push_str(where_sql.as_str());
551    //     had_where = true;
552    // }
553    if !where_sql.is_empty() {
554        sql.push_str(" WHERE ");
555        sql.push_str(where_sql.as_str());
556        had_where = true;
557    }
558
559    if !logic_plugin_sql.is_empty() {
560        if had_where {
561            sql.push_str(" AND ");
562        } else {
563            sql.push_str(" WHERE ");
564        }
565        sql.push_str(logic_plugin_sql.as_str());
566    }
567    //跟踪日志
568    trace_exec_log(&sql, &args);
569    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
570    let client = pools.get().await.unwrap();
571    if client.is_closed() {}
572    let statement = client.prepare(sql.as_str()).await.unwrap();
573    let result = client.query_one(&statement, &args).await;
574    // println!("result:{:?}", result);
575    match result {
576        Ok(row) => {
577            let result_t = T::return_one(row);
578            return Ok(result_t);
579        }
580        Err(e) => {
581            // let error = format!("{}", e);
582            return Err(e);
583        }
584    }
585}
586
587//fetch_page<T>(wrapper) ->Result<PageT<>, Error>     单表翻页
588pub async fn fetch_page<T>(
589    wrapper: Wrapper<'_>,
590    prefix: Option<String>,
591    suffix: Option<String>,
592) -> Result<PageT<T>, Error>
593where
594    T: Parameters,
595{
596    // let wrapper = &wrapper;
597    //验证入参数
598    let (_limit_str, _limit, page_no, page_size) = wrapper.clone().get_page_info();
599
600    let recoder_field = wrapper.clone().get_recoder_field::<T>();
601    // let field_name = T::get_field_list();
602    let table_name = T::get_table_name(prefix, suffix);
603
604    let (where_sql, args) = wrapper.clone().build(1).unwrap();
605
606    //获取逻辑未删除部分的SQL
607    let logic_plugin_sql = get_logic_undelete(&table_name);
608
609    let mut sql = String::new();
610    sql.push_str("SELECT ");
611    sql.push_str(recoder_field.as_str());
612    sql.push_str(" FROM ");
613    sql.push_str(table_name.as_str());
614
615    let mut had_where = false;
616    // if args.len() != 0 {
617    //     sql.push_str(" WHERE ");
618    //     sql.push_str(where_sql.as_str());
619    //     had_where = true;
620    // }
621    if !where_sql.is_empty() {
622        sql.push_str(" WHERE ");
623        sql.push_str(where_sql.as_str());
624        had_where = true;
625    }
626
627    //增加逻辑删除
628    if !logic_plugin_sql.is_empty() {
629        if had_where {
630            sql.push_str(" AND ");
631        } else {
632            sql.push_str(" WHERE ");
633        }
634        sql.push_str(logic_plugin_sql.as_str());
635    }
636
637    let re_query_page_total = query_page_total(&sql, &args).await;
638    debug!("re_query_page_total: {:?}", re_query_page_total);
639    let total = match re_query_page_total {
640        Ok(total) => total,
641        Err(err_str) => return Err(err_str),
642    };
643    if total == 0 {
644        let result = PageT {
645            // data
646            records: Vec::new(),
647            // total num
648            total: 0,
649            // pages
650            pages: 0,
651            // current page index
652            page_no,
653            // default 10
654            page_size,
655        };
656
657        return Ok(result);
658    }
659
660    //增加是否排序
661    match wrapper.clone().get_order_by() {
662        Some(order_by) => {
663            sql.push_str(order_by.as_str());
664        }
665        None => (),
666    };
667
668    //增加翻页
669    let (limit_str, _limit, page_no, page_size) = wrapper.clone().get_page_info();
670    let mut pages = total / page_size;
671    let duoyu = total % page_size;
672    if duoyu > 0 {
673        pages = pages + 1
674    }
675
676    sql.push_str(limit_str.as_str());
677
678    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
679    let client = pools.get().await.unwrap();
680    let statement = client.prepare(sql.as_str()).await.unwrap();
681    let result = client.query(&statement, &args).await;
682
683    let result = match result {
684        Ok(t) => t,
685        Err(e) => {
686            // let error_str = format!("{}", e);
687            return Err(e);
688        }
689    };
690
691    let records = T::return_list(result);
692
693    let result = PageT {
694        // data
695        records,
696        // total num
697        total,
698        // pages
699        pages,
700        // current page index
701        page_no,
702        // default 10
703        page_size,
704    };
705
706    return Ok(result);
707}
708
709//============================================================================================
710//不需要翻页的查询
711pub async fn query(
712    sql: &str,
713    params: &[&(dyn ToSql + Sync)],
714) -> Result<Vec<HashMap<String, Value>>, Error> {
715    //跟踪日志
716    trace_exec_log(sql, &params);
717    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
718    let client = pools.get().await.unwrap();
719    let statement = client.prepare(&sql).await.unwrap();
720    let opt_result = client.query(&statement, params).await;
721    let mut new_row_hashmap = Vec::new();
722    match opt_result {
723        Ok(result) => {
724            for row in result {
725                new_row_hashmap.push(unit::pg_value_to_json_value(&row));
726            }
727        }
728        Err(e) => {
729            return Err(e);
730        }
731    }
732    return Ok(new_row_hashmap);
733}
734
735//返回结构体的方式 排序由此决定
736pub async fn query_t<T>(sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<T>, Error>
737where
738    T: Parameters,
739{
740    //跟踪日志
741    trace_exec_log(&sql, &params);
742    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
743    let client = pools.get().await.unwrap();
744    let statement = client.prepare(&sql).await.unwrap();
745    let result = client.query(&statement, params).await;
746
747    let result = match result {
748        Ok(t) => t,
749        Err(e) => {
750            //let error_str = format!("{}", e);
751            return Err(e);
752        }
753    };
754    let records = T::return_list(result);
755
756    return Ok(records);
757}
758
759pub async fn query_one(
760    sql: &str,
761    params: &[&(dyn ToSql + Sync)],
762) -> Result<HashMap<String, Value>, String> {
763    trace_exec_log(&sql, &params);
764    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
765    let client = pools.get().await.unwrap();
766    let statement = client.prepare(&sql).await.unwrap();
767    let opt_result = client.query_one(&statement, params).await;
768    // let mut new_row_hashmap = Vec::new();
769    match opt_result {
770        Ok(result) => {
771            return Ok(unit::pg_value_to_json_value(&result));
772        }
773        Err(e) => {
774            let err_str = format!("{}", e);
775            return Err(err_str);
776        }
777    }
778}
779
780//查询行数
781//原理就是找出第一个 from 然后截取后面的条件。前面拼接上select count(*) from ......
782async fn query_page_total(sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error> {
783    let mut opt_index = sql.find(" from ");
784    debug!("opt_index1: {:?}",opt_index);
785    opt_index =  if opt_index.is_none() {
786        sql.find(" FROM ")
787    }else{
788        opt_index
789    };
790    let index = opt_index.take().unwrap();
791    
792
793    //然后进行拼接
794    let sql_from = &sql[index..sql.len()];
795    let total_sql = format!("select count(*) as total {}", sql_from);
796    //跟踪日志
797    trace_exec_log(&sql, params);
798    //执行
799    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
800    let client = pools.get().await.unwrap();
801    let statement = client.prepare(&total_sql).await.unwrap();
802    let opt_result = client.query_one(&statement, params).await;
803    match opt_result {
804        Ok(result) => {
805            let total: i64 = result.get("total");
806            return Ok(total as u64);
807        }
808        Err(e) => {
809            // let err_str = format!("{}", e);
810            return Err(e);
811        }
812    }
813}
814
815pub async fn query_page(
816    sql: &str,
817    params: &[&(dyn ToSql + Sync)],
818    order_by: &str,
819    desc: bool,
820    page_no: u64,
821    page_size: u64,
822) -> Result<PageHash, Error> {
823    //验证入参
824    // if page_size <= 0 {return Err("page_size 必须大于0".to_string())};
825
826    let re_query_page_total = query_page_total(&sql, params).await;
827
828    let total = match re_query_page_total {
829        Ok(total) => total,
830        Err(err_str) => return Err(err_str),
831    };
832
833    //一共多少页
834    let pages = total / page_size;
835
836    let mut new_row_hashmap = Vec::new();
837    //如果等于0 就是没有数据 直接返回
838    if total == 0 {
839        let page_data = PageHash {
840            records: new_row_hashmap,
841            total: total,
842            pages,
843            page_no,
844            page_size,
845        };
846        return Ok(page_data);
847    }
848
849    //生成查询语句 拼接排序,页数,每页的数据条数
850    let b_order_desc = if desc { "DESC" } else { "ASC" };
851    //let order_str = order.join(",");
852    //LIMIT [no of rows] OFFSET [row num]
853    let offset = (page_no - 1) * page_size;
854
855    let row_sql = format!(
856        "{} ORDER BY  {} {} LIMIT {} OFFSET {};",
857        sql, order_by, b_order_desc, page_size, offset
858    );
859    trace_exec_log(&row_sql, params);
860    trace_exec_log(&sql, &params);
861    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
862    let client = pools.get().await.unwrap();
863    let statement = client.prepare(&row_sql).await.unwrap();
864    let opt_result = client.query(&statement, params).await;
865
866    match opt_result {
867        Ok(result) => {
868            for row in result {
869                let hashmap = unit::pg_value_to_json_value(&row);
870                new_row_hashmap.push(hashmap);
871            }
872        }
873        Err(e) => {
874            // let err_str = format!("{}", e);
875            return Err(e);
876        }
877    }
878
879    let page_data = PageHash {
880        records: new_row_hashmap,
881        total: total,
882        pages,
883        page_no,
884        page_size,
885    };
886    // return Ok(new_row_hashmap);
887    Ok(page_data)
888}
889
890pub async fn check_row_by_column<C, T>(
891    table_name: &str,
892    column: &C,
893    value: T,
894) -> Result<bool, Error>
895where
896    C: ColumnExt,
897    T: ToSql + Sync,
898{
899    let column_name = column.get();
900    let mut sql = format!(
901        "SELECT \"{}\" FROM \"{}\" WHERE \"{}\" = $1",
902        column_name, table_name, column_name
903    );
904
905    //获取逻辑未删除部分的SQL
906    let logic_plugin_sql = get_logic_undelete(&table_name);
907
908    if !logic_plugin_sql.is_empty() {
909        sql.push_str(" AND ");
910        sql.push_str(logic_plugin_sql.as_str());
911    }
912    debug!("check_row_by_column sql :{}", sql);
913    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
914    let client = pools.get().await.unwrap();
915    // let result = client.query_opt(sql.as_str(), &[&value]).await;
916    // match result {
917    //     Ok(opt_row) => match opt_row {
918    //         Some(_row) => return Ok(true),
919    //         None => return Ok(false),
920    //     },
921    //     Err(e) => {
922    //        // let err_str = format!("{}", e);
923    //         error!("check_row_by_column:{}",e);
924    //         return Err(e);
925    //     }
926    // }
927    let result = client.query(sql.as_str(), &[&value]).await;
928    match result {
929        Ok(opt_row) => {
930            if opt_row.len() > 0 {
931                return Ok(true);
932            } else {
933                return Ok(false);
934            }
935        }
936        Err(e) => {
937            // let err_str = format!("{}", e);
938            error!("check_row_by_column:{}", e);
939            return Err(e);
940        }
941    }
942}
943
944pub async fn check_row_wrap<T>(
945    wrapper: Wrapper<'_>,
946    prefix: Option<String>,
947    suffix: Option<String>,
948) -> Result<bool, Error>
949where
950    T: Parameters,
951{
952    let table_name = T::get_table_name(prefix, suffix);
953    let (where_sql, args) = wrapper.clone().build(1).unwrap();
954
955    //获取逻辑未删除部分的SQL
956    let logic_plugin_sql = get_logic_undelete(&table_name);
957
958    let mut sql = String::new();
959    sql.push_str("SELECT ");
960    sql.push_str(" 1 ");
961    sql.push_str(" FROM ");
962    sql.push_str(table_name.as_str());
963
964    let mut had_where = false;
965    if args.len() != 0 {
966        sql.push_str(" WHERE ");
967        sql.push_str(where_sql.as_str());
968        had_where = true;
969    }
970
971    //增加逻辑删除
972    if !logic_plugin_sql.is_empty() {
973        if had_where {
974            sql.push_str(" AND ");
975        } else {
976            sql.push_str(" WHERE ");
977        }
978        sql.push_str(logic_plugin_sql.as_str());
979    }
980    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
981    let client = pools.get().await.unwrap();
982    let statement = client.prepare(sql.as_str()).await.unwrap();
983    let result = client.query(&statement, &args).await;
984    match result {
985        Ok(opt_row) => {
986            if opt_row.len() == 0 {
987                return Ok(false);
988            } else {
989                return Ok(true);
990            }
991            // Some(_row) => return Ok(true),
992            // None => return Ok(false),
993        }
994        Err(e) => {
995            // let err_str = format!("{}", e);
996            return Err(e);
997        }
998    }
999}
1000
1001fn get_logic_undelete(table_name: &str) -> String {
1002    let logic_plugin_sql = if DATABASE_POOL.logic_plugin.is_some() {
1003        // 判断表名是否存在排除表中, 如果不存在,sql变成逻辑删除的方式
1004        if !DATABASE_POOL
1005            .logic_plugin
1006            .as_ref()
1007            .unwrap()
1008            .is_exclusion_table(table_name)
1009        {
1010            format!(
1011                " \"{}\" = {} ",
1012                DATABASE_POOL.logic_plugin.as_ref().unwrap().column(),
1013                DATABASE_POOL.logic_plugin.as_ref().unwrap().un_deleted(),
1014            )
1015        } else {
1016            EMPTY_STRING.to_string()
1017        }
1018    } else {
1019        EMPTY_STRING.to_string()
1020    };
1021    logic_plugin_sql
1022}
1023/// 字符串左右新增百分号
1024pub fn format_like(obj: &Option<String>) -> Option<String> {
1025    match obj {
1026        Some(t) => {
1027            let two = format!("%{}%", t);
1028            Some(two)
1029        }
1030        None => None,
1031    }
1032}
1033/// 字符串左新增百分号
1034pub fn format_like_left(obj: &Option<String>) -> Option<String> {
1035    match obj {
1036        Some(t) => {
1037            let two = format!("%{}", t);
1038            Some(two)
1039        }
1040        None => None,
1041    }
1042}
1043/// 字符串左新增百分号
1044pub fn format_like_right(obj: &Option<String>) -> Option<String> {
1045    match obj {
1046        Some(t) => {
1047            let two = format!("{}%", t);
1048            Some(two)
1049        }
1050        None => None,
1051    }
1052}
1053
1054///事务新增 用于按指定ID批量删除,然后批量新增
1055pub async fn transaction_delete_and_save<T, V>(
1056    delete_column: &'static str,
1057    delete_value: &V,
1058    p: Vec<T>,
1059    prefix: Option<String>,
1060    suffix: Option<String>,
1061) -> Result<u64, Error>
1062where
1063    T: Parameters,
1064    V: ToSql + Sync,
1065{
1066    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
1067    let mut client = pools.get().await.unwrap();
1068    let transaction = client.transaction().await.unwrap();
1069    //delete_columnlet column_name = delete_column.get();
1070    let table_name = T::get_table_name(prefix.clone(), suffix.clone());
1071    let sql = format!(
1072        "DELETE FROM \"{}\" where \"{}\" = $1",
1073        &table_name, delete_column
1074    );
1075    //执行删除
1076    let statement = transaction.prepare(sql.as_str()).await.unwrap();
1077    match transaction.execute(&statement, &[delete_value]).await {
1078        Ok(_) => {
1079            debug!("删除执行成功");
1080        }
1081        Err(e) => {
1082            error!("删除执行失败: {}", e);
1083            transaction.rollback().await.unwrap();
1084            return Err(e);
1085        }
1086    }
1087
1088    for item in p.iter() {
1089        let (sql, params) = item.gen_save(prefix.clone(), suffix.clone()).unwrap();
1090        let statement = transaction.prepare(sql.as_str()).await.unwrap();
1091        let result = transaction.execute(&statement, &params).await;
1092        match result {
1093            Ok(_t) => {
1094                trace!("SAVE 执行成功");
1095                //return Ok(t);
1096            }
1097            Err(e) => {
1098                transaction.rollback().await.unwrap();
1099                error!("{:?}", e);
1100                return Err(e);
1101            }
1102        }
1103    }
1104    let result = transaction.commit().await;
1105
1106    if result.is_err() {
1107        error!(" transaction.commit() 执行 提交失败!");
1108        //transaction.rollback().await.unwrap();
1109    }
1110    return Ok(0);
1111}
1112
1113#[derive(Clone, Debug)]
1114pub struct TransactionBatchParam<'a> {
1115    pub statement_sql: &'a str,
1116    pub params: Vec<&'a (dyn ToSql + Sync)>,
1117}
1118
1119///事务 批量更新/新增 ,不同的表
1120pub async fn transaction_batch(param: Vec<TransactionBatchParam<'_>>) -> Result<u64, String> {
1121    let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
1122    let mut client = pools.get().await.unwrap();
1123    let transaction = client.transaction().await.unwrap();
1124    //  transaction.execute(statement, params);
1125    for item in param.iter() {
1126        let statement = transaction.prepare(item.statement_sql).await.unwrap();
1127        match transaction.execute(&statement, &item.params).await {
1128            Ok(t) => {
1129                //如果执行没有相应任何一行,表示执行的SQL 没有意义 直接回滚??
1130                if t == 0 {
1131                    if !item.statement_sql.find("DELETE").is_some(){
1132                        transaction.rollback().await.unwrap();
1133                        error!("执行的SQL没有任何意义,响应行数为0");
1134                        return Err("执行的SQL没有任何意义,响应行数为0".to_string());
1135                    }
1136                }
1137            }
1138            Err(e) => {
1139                //执行回滚
1140                transaction.rollback().await.unwrap();
1141                error!("{:?}", e);
1142                return Err(e.to_string());
1143            }
1144        }
1145    }
1146    let result = transaction.commit().await;
1147
1148    if result.is_err() {
1149        error!(" transaction.commit() 执行 提交失败!");
1150        //transaction.rollback().await.unwrap();
1151    }
1152
1153    return Ok(0);
1154}