use lazy_static::lazy_static;
use std::collections::HashMap;
use std::sync::Mutex;
use tokio_postgres::Error;
use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
use once_cell::sync::Lazy;
pub use pgmacro;
pub use plugin::logic_delete::LogicDelete;
pub use plugin::logic_delete::LogicDeletePlugin;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt::Debug;
use tokio::sync::OnceCell;
pub use tokio_postgres;
pub use tokio_postgres::{types::ToSql, Client, NoTls, Row};
pub mod plugin;
pub mod wrapper;
pub use wrapper::Wrapper;
pub mod unit;
use tracing::{debug, error, info, trace};
static EMPTY_STRING: &'static str = r#""#;
pub const LOG_SPACE: &'static str =
" ";
lazy_static! {
static ref EXCLUSION_TABLE: Mutex<Vec<String>> = {
Mutex::new(Vec::new())
};
}
static DATABASE_POOL: Lazy<Pgbatis> = Lazy::new(|| {
let mut m = Pgbatis::new();
let exclusion_table_m = EXCLUSION_TABLE.lock().unwrap();
let exclusion_table = exclusion_table_m.clone();
m.logic_plugin = Some(Box::new(LogicDeletePlugin::new_opt(
"is_deleted",
1,
0,
exclusion_table,
)));
m
});
pub trait Parameters {
fn get_table_name(prefix: Option<String>, suffix: Option<String>) -> String;
fn get_field_list() -> String;
fn gen_save(
&self,
prefix: Option<String>,
suffix: Option<String>,
) -> Result<(String, Vec<&(dyn ToSql + Sync)>), String>;
fn gen_update(
&self,
prefix: Option<String>,
suffix: Option<String>,
) -> Result<(String, Vec<&(dyn ToSql + Sync)>, u32), String>;
fn return_one(rows: Row) -> Self;
fn return_list(rows: Vec<Row>) -> Vec<Self>
where
Self: Sized,
{
let mut result_list = Vec::new();
for row in rows {
result_list.push(Self::return_one(row))
}
return result_list;
}
}
pub trait ColumnExt {
fn get(&self) -> &'static str;
}
pub struct Pgbatis {
pub pools: OnceCell<Pool>,
pub logic_plugin: Option<Box<dyn LogicDelete>>,
}
impl Pgbatis {
pub fn new() -> Self {
return Self::new_with_opt();
}
pub fn new_with_opt() -> Self {
return Self {
pools: OnceCell::new(),
logic_plugin: None,
};
}
pub fn set_logic_delete(&mut self, arg: Option<impl LogicDelete + 'static>) {
match arg {
Some(v) => {
self.logic_plugin = Some(Box::new(v));
}
None => {
self.logic_plugin = None;
}
}
}
}
fn trace_exec_log(sql: &str, args: &[&(dyn ToSql + Sync)]) {
let format = format!(
"Exec ==> {}\n{}[pgbatis] [{}] Args ==> {:?}",
&sql, LOG_SPACE, "", args
);
trace!("{}", format);
}
pub fn set_unlogic_delete_table(table_name: &str) {
let mut exclusion_table_m = EXCLUSION_TABLE.lock().unwrap();
exclusion_table_m.push(table_name.to_string());
}
pub async fn link(
host: &str,
port: u16,
user: &str,
password: &str,
dbname: &str,
max_size: usize,
) -> Result<(), String> {
let mut pg_config = tokio_postgres::Config::new();
pg_config.host(host);
pg_config.port(port);
pg_config.user(user);
pg_config.password(password);
pg_config.dbname(dbname);
let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Verified,
};
let mgr = Manager::from_config(pg_config, NoTls, mgr_config);
let pool = Pool::new(mgr, max_size);
let _ = DATABASE_POOL.pools.set(pool);
let driver_url = format!("postgresql://{}:{}@{}/{}", user, password, host, dbname);
info!("数据库连接:{}", driver_url);
return Ok(());
}
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct PageT<T>
where
T: Parameters,
{
pub records: Vec<T>,
pub total: u64,
pub pages: u64,
pub page_no: u64,
pub page_size: u64,
}
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct PageHash {
pub records: Vec<HashMap<String, Value>>,
pub total: u64,
pub pages: u64,
pub page_no: u64,
pub page_size: u64,
}
pub async fn execute(sql: &String, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error> {
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let statement = client.prepare(sql.as_str()).await.unwrap();
let result = client.execute(&statement, ¶ms).await;
match result {
Ok(t) => {
trace!("execute 执行成功");
return Ok(t);
}
Err(e) => {
error!("{:?}", e);
return Err(e);
}
}
}
pub async fn save<T>(p: T, prefix: Option<String>, suffix: Option<String>) -> Result<u64, Error>
where
T: Parameters,
{
let (sql, args) = p.gen_save(prefix, suffix).unwrap();
trace_exec_log(&sql, &args);
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let statement = client.prepare(sql.as_str()).await.unwrap();
let result = client.execute(&statement, &args).await;
match result {
Ok(t) => {
trace!("SAVE 执行成功");
return Ok(t);
}
Err(e) => {
error!("{:?}", e);
return Err(e);
}
}
}
pub async fn update<T>(
p: T,
wrapper: Wrapper<'_>,
prefix: Option<String>,
suffix: Option<String>,
) -> Result<u64, Error>
where
T: Parameters,
{
let (mut sql, mut args, args_number) = p.gen_update(prefix.clone(), suffix.clone()).unwrap();
let table_name = T::get_table_name(prefix, suffix);
let logic_plugin_sql = get_logic_undelete(&table_name);
let (where_sql, mut where_args) = wrapper.build(args_number).unwrap();
if !where_sql.is_empty() && !logic_plugin_sql.is_empty() {
sql = format!("{} WHERE {} AND {}", sql, where_sql, logic_plugin_sql);
} else if !where_sql.is_empty() && logic_plugin_sql.is_empty() {
sql = format!("{} WHERE {} ", sql, where_sql);
} else if where_sql.is_empty() && !logic_plugin_sql.is_empty() {
sql = format!("{} WHERE {} ", sql, logic_plugin_sql);
}
args.append(&mut where_args);
trace_exec_log(&sql, &args);
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let statement = client.prepare(sql.as_str()).await.unwrap();
let result = client.execute(&statement, &args).await;
match result {
Ok(t) => {
return Ok(t);
}
Err(e) => {
error!("{:?}", e);
return Err(e);
}
}
}
pub async fn remove<P>(
wrapper: Wrapper<'_>,
prefix: Option<String>,
suffix: Option<String>,
) -> Result<u64, Error>
where
P: Parameters,
{
let table_name = P::get_table_name(prefix, suffix);
let mut sql = format!("DELETE FROM \"{}\"", &table_name);
if DATABASE_POOL.logic_plugin.is_some() {
if !DATABASE_POOL
.logic_plugin
.as_ref()
.unwrap()
.is_exclusion_table(table_name.as_str())
{
sql = format!(
"UPDATE \"{}\" set \"{}\" = {} ",
table_name,
DATABASE_POOL.logic_plugin.as_ref().unwrap().column(),
DATABASE_POOL.logic_plugin.as_ref().unwrap().deleted(),
);
}
}
let (where_sql, args) = wrapper.build(1).unwrap();
if !where_sql.is_empty() {
sql = format!("{} where {}", sql, where_sql);
}
trace_exec_log(&sql, &args);
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let statement = client.prepare(sql.as_str()).await.unwrap();
let result = client.execute(&statement, &args).await;
match result {
Ok(t) => {
return Ok(t);
}
Err(e) => {
error!("{:?}", e);
return Err(e);
}
}
}
pub async fn fetch<T>(
wrapper: Wrapper<'_>,
prefix: Option<String>,
suffix: Option<String>,
) -> Result<Vec<T>, Error>
where
T: Parameters,
{
let field_name = wrapper.clone().get_recoder_field::<T>();
let table_name = T::get_table_name(prefix, suffix);
let (where_sql, args) = wrapper.clone().build(1).unwrap();
let logic_plugin_sql = get_logic_undelete(&table_name);
let mut sql = String::new();
sql.push_str("SELECT ");
sql.push_str(field_name.as_str());
sql.push_str(" FROM ");
sql.push_str(table_name.as_str());
let mut had_where = false;
if !where_sql.is_empty() {
sql.push_str(" WHERE ");
sql.push_str(where_sql.as_str());
had_where = true;
}
if !logic_plugin_sql.is_empty() {
if had_where {
sql.push_str(" AND ");
} else {
sql.push_str(" WHERE ");
}
sql.push_str(logic_plugin_sql.as_str());
}
let order_by_format = match wrapper.get_order_by() {
Some(t) => t,
None => " ".to_string(),
};
sql.push_str(&order_by_format);
trace_exec_log(&sql, &args);
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let statement = client.prepare(sql.as_str()).await.unwrap();
let result = client.query(&statement, &args).await?;
let result_t = T::return_list(result);
return Ok(result_t);
}
pub async fn fetch_with_recoder_field<T>(
wrapper: Wrapper<'_>,
prefix: Option<String>,
suffix: Option<String>,
) -> Result<Vec<HashMap<String, Value>>, Error>
where
T: Parameters,
{
let field_name = wrapper.clone().get_recoder_field::<T>();
let table_name = T::get_table_name(prefix, suffix);
let (where_sql, args) = wrapper.clone().build(1).unwrap();
let logic_plugin_sql = get_logic_undelete(&table_name);
let mut sql = String::new();
sql.push_str("SELECT ");
sql.push_str(field_name.as_str());
sql.push_str(" FROM ");
sql.push_str(table_name.as_str());
let mut had_where = false;
if !where_sql.is_empty() {
sql.push_str(" WHERE ");
sql.push_str(where_sql.as_str());
had_where = true;
}
if !logic_plugin_sql.is_empty() {
if had_where {
sql.push_str(" AND ");
} else {
sql.push_str(" WHERE ");
}
sql.push_str(logic_plugin_sql.as_str());
}
let order_by_format = match wrapper.get_order_by() {
Some(t) => t,
None => " ".to_string(),
};
sql.push_str(&order_by_format);
trace_exec_log(&sql, &args);
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let statement = client.prepare(sql.as_str()).await.unwrap();
let result = client.query(&statement, &args).await?;
let mut new_row_hashmap = Vec::new();
for row in result {
new_row_hashmap.push(unit::pg_value_to_json_value(&row));
}
return Ok(new_row_hashmap);
}
pub async fn fetch_one<T>(
wrapper: Wrapper<'_>,
prefix: Option<String>,
suffix: Option<String>,
) -> Result<T, Error>
where
T: Parameters,
{
let field_name = wrapper.clone().get_recoder_field::<T>();
let table_name = T::get_table_name(prefix, suffix);
let (where_sql, args) = wrapper.build(1).unwrap();
let logic_plugin_sql = get_logic_undelete(&table_name);
let mut sql = String::new();
sql.push_str("SELECT ");
sql.push_str(field_name.as_str());
sql.push_str(" FROM ");
sql.push_str(table_name.as_str());
let mut had_where = false;
if !where_sql.is_empty() {
sql.push_str(" WHERE ");
sql.push_str(where_sql.as_str());
had_where = true;
}
if !logic_plugin_sql.is_empty() {
if had_where {
sql.push_str(" AND ");
} else {
sql.push_str(" WHERE ");
}
sql.push_str(logic_plugin_sql.as_str());
}
trace_exec_log(&sql, &args);
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
if client.is_closed() {}
let statement = client.prepare(sql.as_str()).await.unwrap();
let result = client.query_one(&statement, &args).await;
match result {
Ok(row) => {
let result_t = T::return_one(row);
return Ok(result_t);
}
Err(e) => {
return Err(e);
}
}
}
pub async fn fetch_page<T>(
wrapper: Wrapper<'_>,
prefix: Option<String>,
suffix: Option<String>,
) -> Result<PageT<T>, Error>
where
T: Parameters,
{
let (_limit_str, _limit, page_no, page_size) = wrapper.clone().get_page_info();
let recoder_field = wrapper.clone().get_recoder_field::<T>();
let table_name = T::get_table_name(prefix, suffix);
let (where_sql, args) = wrapper.clone().build(1).unwrap();
let logic_plugin_sql = get_logic_undelete(&table_name);
let mut sql = String::new();
sql.push_str("SELECT ");
sql.push_str(recoder_field.as_str());
sql.push_str(" FROM ");
sql.push_str(table_name.as_str());
let mut had_where = false;
if !where_sql.is_empty() {
sql.push_str(" WHERE ");
sql.push_str(where_sql.as_str());
had_where = true;
}
if !logic_plugin_sql.is_empty() {
if had_where {
sql.push_str(" AND ");
} else {
sql.push_str(" WHERE ");
}
sql.push_str(logic_plugin_sql.as_str());
}
let re_query_page_total = query_page_total(&sql, &args).await;
debug!("re_query_page_total: {:?}", re_query_page_total);
let total = match re_query_page_total {
Ok(total) => total,
Err(err_str) => return Err(err_str),
};
if total == 0 {
let result = PageT {
records: Vec::new(),
total: 0,
pages: 0,
page_no,
page_size,
};
return Ok(result);
}
match wrapper.clone().get_order_by() {
Some(order_by) => {
sql.push_str(order_by.as_str());
}
None => (),
};
let (limit_str, _limit, page_no, page_size) = wrapper.clone().get_page_info();
let mut pages = total / page_size;
let duoyu = total % page_size;
if duoyu > 0 {
pages = pages + 1
}
sql.push_str(limit_str.as_str());
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let statement = client.prepare(sql.as_str()).await.unwrap();
let result = client.query(&statement, &args).await;
let result = match result {
Ok(t) => t,
Err(e) => {
return Err(e);
}
};
let records = T::return_list(result);
let result = PageT {
records,
total,
pages,
page_no,
page_size,
};
return Ok(result);
}
pub async fn query(
sql: &str,
params: &[&(dyn ToSql + Sync)],
) -> Result<Vec<HashMap<String, Value>>, Error> {
trace_exec_log(sql, ¶ms);
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let statement = client.prepare(&sql).await.unwrap();
let opt_result = client.query(&statement, params).await;
let mut new_row_hashmap = Vec::new();
match opt_result {
Ok(result) => {
for row in result {
new_row_hashmap.push(unit::pg_value_to_json_value(&row));
}
}
Err(e) => {
return Err(e);
}
}
return Ok(new_row_hashmap);
}
pub async fn query_t<T>(sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<T>, Error>
where
T: Parameters,
{
trace_exec_log(&sql, ¶ms);
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let statement = client.prepare(&sql).await.unwrap();
let result = client.query(&statement, params).await;
let result = match result {
Ok(t) => t,
Err(e) => {
return Err(e);
}
};
let records = T::return_list(result);
return Ok(records);
}
pub async fn query_one(
sql: &str,
params: &[&(dyn ToSql + Sync)],
) -> Result<HashMap<String, Value>, String> {
trace_exec_log(&sql, ¶ms);
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let statement = client.prepare(&sql).await.unwrap();
let opt_result = client.query_one(&statement, params).await;
match opt_result {
Ok(result) => {
return Ok(unit::pg_value_to_json_value(&result));
}
Err(e) => {
let err_str = format!("{}", e);
return Err(err_str);
}
}
}
async fn query_page_total(sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error> {
let mut opt_index = sql.find(" from ");
debug!("opt_index1: {:?}",opt_index);
opt_index = if opt_index.is_none() {
sql.find(" FROM ")
}else{
opt_index
};
let index = opt_index.take().unwrap();
let sql_from = &sql[index..sql.len()];
let total_sql = format!("select count(*) as total {}", sql_from);
trace_exec_log(&sql, params);
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let statement = client.prepare(&total_sql).await.unwrap();
let opt_result = client.query_one(&statement, params).await;
match opt_result {
Ok(result) => {
let total: i64 = result.get("total");
return Ok(total as u64);
}
Err(e) => {
return Err(e);
}
}
}
pub async fn query_page(
sql: &str,
params: &[&(dyn ToSql + Sync)],
order_by: &str,
desc: bool,
page_no: u64,
page_size: u64,
) -> Result<PageHash, Error> {
let re_query_page_total = query_page_total(&sql, params).await;
let total = match re_query_page_total {
Ok(total) => total,
Err(err_str) => return Err(err_str),
};
let pages = total / page_size;
let mut new_row_hashmap = Vec::new();
if total == 0 {
let page_data = PageHash {
records: new_row_hashmap,
total: total,
pages,
page_no,
page_size,
};
return Ok(page_data);
}
let b_order_desc = if desc { "DESC" } else { "ASC" };
let offset = (page_no - 1) * page_size;
let row_sql = format!(
"{} ORDER BY {} {} LIMIT {} OFFSET {};",
sql, order_by, b_order_desc, page_size, offset
);
trace_exec_log(&row_sql, params);
trace_exec_log(&sql, ¶ms);
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let statement = client.prepare(&row_sql).await.unwrap();
let opt_result = client.query(&statement, params).await;
match opt_result {
Ok(result) => {
for row in result {
let hashmap = unit::pg_value_to_json_value(&row);
new_row_hashmap.push(hashmap);
}
}
Err(e) => {
return Err(e);
}
}
let page_data = PageHash {
records: new_row_hashmap,
total: total,
pages,
page_no,
page_size,
};
Ok(page_data)
}
pub async fn check_row_by_column<C, T>(
table_name: &str,
column: &C,
value: T,
) -> Result<bool, Error>
where
C: ColumnExt,
T: ToSql + Sync,
{
let column_name = column.get();
let mut sql = format!(
"SELECT \"{}\" FROM \"{}\" WHERE \"{}\" = $1",
column_name, table_name, column_name
);
let logic_plugin_sql = get_logic_undelete(&table_name);
if !logic_plugin_sql.is_empty() {
sql.push_str(" AND ");
sql.push_str(logic_plugin_sql.as_str());
}
debug!("check_row_by_column sql :{}", sql);
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let result = client.query(sql.as_str(), &[&value]).await;
match result {
Ok(opt_row) => {
if opt_row.len() > 0 {
return Ok(true);
} else {
return Ok(false);
}
}
Err(e) => {
error!("check_row_by_column:{}", e);
return Err(e);
}
}
}
pub async fn check_row_wrap<T>(
wrapper: Wrapper<'_>,
prefix: Option<String>,
suffix: Option<String>,
) -> Result<bool, Error>
where
T: Parameters,
{
let table_name = T::get_table_name(prefix, suffix);
let (where_sql, args) = wrapper.clone().build(1).unwrap();
let logic_plugin_sql = get_logic_undelete(&table_name);
let mut sql = String::new();
sql.push_str("SELECT ");
sql.push_str(" 1 ");
sql.push_str(" FROM ");
sql.push_str(table_name.as_str());
let mut had_where = false;
if args.len() != 0 {
sql.push_str(" WHERE ");
sql.push_str(where_sql.as_str());
had_where = true;
}
if !logic_plugin_sql.is_empty() {
if had_where {
sql.push_str(" AND ");
} else {
sql.push_str(" WHERE ");
}
sql.push_str(logic_plugin_sql.as_str());
}
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let client = pools.get().await.unwrap();
let statement = client.prepare(sql.as_str()).await.unwrap();
let result = client.query(&statement, &args).await;
match result {
Ok(opt_row) => {
if opt_row.len() == 0 {
return Ok(false);
} else {
return Ok(true);
}
}
Err(e) => {
return Err(e);
}
}
}
fn get_logic_undelete(table_name: &str) -> String {
let logic_plugin_sql = if DATABASE_POOL.logic_plugin.is_some() {
if !DATABASE_POOL
.logic_plugin
.as_ref()
.unwrap()
.is_exclusion_table(table_name)
{
format!(
" \"{}\" = {} ",
DATABASE_POOL.logic_plugin.as_ref().unwrap().column(),
DATABASE_POOL.logic_plugin.as_ref().unwrap().un_deleted(),
)
} else {
EMPTY_STRING.to_string()
}
} else {
EMPTY_STRING.to_string()
};
logic_plugin_sql
}
pub fn format_like(obj: &Option<String>) -> Option<String> {
match obj {
Some(t) => {
let two = format!("%{}%", t);
Some(two)
}
None => None,
}
}
pub fn format_like_left(obj: &Option<String>) -> Option<String> {
match obj {
Some(t) => {
let two = format!("%{}", t);
Some(two)
}
None => None,
}
}
pub fn format_like_right(obj: &Option<String>) -> Option<String> {
match obj {
Some(t) => {
let two = format!("{}%", t);
Some(two)
}
None => None,
}
}
pub async fn transaction_delete_and_save<T, V>(
delete_column: &'static str,
delete_value: &V,
p: Vec<T>,
prefix: Option<String>,
suffix: Option<String>,
) -> Result<u64, Error>
where
T: Parameters,
V: ToSql + Sync,
{
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let mut client = pools.get().await.unwrap();
let transaction = client.transaction().await.unwrap();
let table_name = T::get_table_name(prefix.clone(), suffix.clone());
let sql = format!(
"DELETE FROM \"{}\" where \"{}\" = $1",
&table_name, delete_column
);
let statement = transaction.prepare(sql.as_str()).await.unwrap();
match transaction.execute(&statement, &[delete_value]).await {
Ok(_) => {
debug!("删除执行成功");
}
Err(e) => {
error!("删除执行失败: {}", e);
transaction.rollback().await.unwrap();
return Err(e);
}
}
for item in p.iter() {
let (sql, params) = item.gen_save(prefix.clone(), suffix.clone()).unwrap();
let statement = transaction.prepare(sql.as_str()).await.unwrap();
let result = transaction.execute(&statement, ¶ms).await;
match result {
Ok(_t) => {
trace!("SAVE 执行成功");
}
Err(e) => {
transaction.rollback().await.unwrap();
error!("{:?}", e);
return Err(e);
}
}
}
let result = transaction.commit().await;
if result.is_err() {
error!(" transaction.commit() 执行 提交失败!");
}
return Ok(0);
}
#[derive(Clone, Debug)]
pub struct TransactionBatchParam<'a> {
pub statement_sql: &'a str,
pub params: Vec<&'a (dyn ToSql + Sync)>,
}
pub async fn transaction_batch(param: Vec<TransactionBatchParam<'_>>) -> Result<u64, String> {
let pools: &Pool = DATABASE_POOL.pools.get().unwrap();
let mut client = pools.get().await.unwrap();
let transaction = client.transaction().await.unwrap();
for item in param.iter() {
let statement = transaction.prepare(item.statement_sql).await.unwrap();
match transaction.execute(&statement, &item.params).await {
Ok(t) => {
if t == 0 {
if !item.statement_sql.find("DELETE").is_some(){
transaction.rollback().await.unwrap();
error!("执行的SQL没有任何意义,响应行数为0");
return Err("执行的SQL没有任何意义,响应行数为0".to_string());
}
}
}
Err(e) => {
transaction.rollback().await.unwrap();
error!("{:?}", e);
return Err(e.to_string());
}
}
}
let result = transaction.commit().await;
if result.is_err() {
error!(" transaction.commit() 执行 提交失败!");
}
return Ok(0);
}