use crate::base::entity::{Entity};
use crate::base::error::DatabaseError;
use crate::base::param::ParamValue;
use tracing::{error, warn};
use std::option::Option;
use std::sync::{Arc};
use tokio::sync::Mutex;
use crate::base::mapping::Mapping;
pub trait RowType{
fn col_to_v_by_index(&self, col_index: usize, ) -> Result<ParamValue, DatabaseError> where Self: Sized ;
fn col_to_v_by_name(&self, col_name: &str) -> Result<ParamValue, DatabaseError> where Self: Sized ;
}
#[allow(async_fn_in_trait)]
pub trait Executor{
type Row<'a>: RowType + 'a;
type Conn;
async fn query<T, R, F, Q>(
&self,
conn: Arc<Mutex<Self::Conn>>,
sql: &str,
params: &Vec<ParamValue>,
mapper: F,
processor: Q,
) -> Result<R, DatabaseError>
where
T: Send + 'static,
R: Send + 'static,
F: for<'a> Fn(&Self::Row<'a>) -> Result<T, DatabaseError> + Send + Sync + 'static,
Q: FnOnce(Vec<T>) -> Result<R, DatabaseError> + Send + 'static;
async fn execute(
&self,
conn: Arc<Mutex<Self::Conn>>,
sql: &str,
params: &Vec<ParamValue>,
) -> Result<u64, DatabaseError>;
async fn exec_basic(&self, sql: &str, params: &Vec<ParamValue>) -> Result<u64, DatabaseError> {
warn!("execute sql: {} with params: {:?}", sql, params);
let conn_ref = self.get_conn_ref();
if conn_ref.is_ok() {
let conn_ref = conn_ref?.clone();
self.execute(conn_ref, sql, params).await
} else {
let conn: Self::Conn = self.get_conn().await?;
self.execute(Arc::new(Mutex::new(conn)), sql, params).await
}
}
async fn query_basic<T, R, F, Q>(
&self,
sql: &str,
params: &Vec<ParamValue>,
mapper: F,
processor: Q,
) -> Result<R, DatabaseError>
where
T: Send + 'static,
R: Send + 'static,
F: for<'a> Fn(&Self::Row<'a>) -> Result<T, DatabaseError> + Send + 'static + Sync,
Q: FnOnce(Vec<T>) -> Result<R, DatabaseError> + Send + 'static{
warn!("query sql: {} with params: {:?}", sql, params);
let conn_ref = self.get_conn_ref();
if conn_ref.is_ok() {
let conn_ref = conn_ref?.clone();
self.query(conn_ref, sql, params, mapper, processor).await } else {
let conn = self.get_conn().await?;
self.query(Arc::new(Mutex::new(conn)), sql, params, mapper, processor).await
}
}
fn row_to_e<E>(row: &Self::Row<'_>) -> Result<E, DatabaseError> where E:Mapping{
let mut e = E::new();
for col in E::column_names() {
let val = row.col_to_v_by_name(col)?;
e.set_value_by_column_name(col, val);
}
Ok(e)
}
fn get_conn_ref(&self)-> Result<Arc<Mutex<Self::Conn>>,DatabaseError> ;
async fn get_conn(&self)-> Result<Self::Conn,DatabaseError>;
async fn query_some<E>(&self, sql:&str, params: &Vec<ParamValue>) -> Result<Vec<E>,DatabaseError> where E:Mapping{
self.query_basic::<E, Vec<E>, _, _>(sql, params, |row|Self::row_to_e(row), |results: Vec<E>| {
Ok(results)
}).await
}
async fn query_one<E>(&self, sql:&str, params: &Vec<ParamValue>) -> Result<Option<E>,DatabaseError> where E:Mapping{
{
self.query_basic::<E, Option<E>, _, _>(sql, params, |row|Self::row_to_e(row), |results: Vec<E>| {
Ok(results.into_iter().next())
}).await
}
}
async fn query_one_value<T>(&self, sql:&str, params: &Vec<ParamValue>) -> Result<Option<T>,DatabaseError> where Option<T>:From<ParamValue>+Send+Sync+'static{
self.query_basic::<_, _, _, _>(
sql,
params,
|row| {
let v = (row).col_to_v_by_index(0).unwrap();
Ok(v)
},
|results: Vec<ParamValue>| {
if results.is_empty() {
return Ok(None)
}
Ok(results[0].clone().into())
},
).await
}
async fn query_count(&self, sql:&str, params: &Vec<ParamValue>) -> Result<u64,DatabaseError>{
self.query_basic::<i64, u64, _, _>(
sql,
params,
|row| {
let v = (row).col_to_v_by_index(0).unwrap();
Ok(v.into())
},
|results: Vec<i64>| Ok(results[0] as u64),
).await
}
async fn insert<E>(&self, sql:&str, params: &Vec<ParamValue>) -> Result<Option<E::K>,DatabaseError>where E:Entity{
self.query_basic::<ParamValue, Option<E::K>, _, _>(
sql,
params,
|row| {
let val = (row).col_to_v_by_index(0);
return match val {
Ok(v) => Ok(v),
Err(e) => {
error!("Error: {}", e);
Ok(ParamValue::Null)
},
}
},
|results: Vec<ParamValue>| {
if results.is_empty() {
Ok(None)
} else {
Ok(Some(results[0].clone().into()))
}
},
).await
}
async fn insert_batch<E>(&self, sql: &str, params: &Vec<ParamValue>) -> Result<u64, DatabaseError>
where
E: Entity,
{
self.exec_basic(sql, params).await
}
async fn delete(&self, sql: &str, params: &Vec<ParamValue>) -> Result<u64, DatabaseError> {
self.exec_basic(sql, params).await
}
async fn update(&self, sql: &str, params: &Vec<ParamValue>) -> Result<u64, DatabaseError> {
self.exec_basic(sql, params).await
}
async fn execute_sql(&self, sql: &str, params: &Vec<ParamValue>) -> Result<u64, DatabaseError> {
self.exec_basic(sql, params).await
}
async fn start_transaction(&self)->Result<(),DatabaseError>{
Err(DatabaseError::NotSupportedError("start_transaction".to_string()))
}
async fn commit(&self)->Result<(),DatabaseError>{
Err(DatabaseError::NotSupportedError("commit".to_string()))
}
async fn rollback(&self)->Result<(),DatabaseError>{
Err(DatabaseError::NotSupportedError("rollback".to_string()))
}
async fn transaction_basic_exec<F, T, Fut>(&self,func: F) -> Result<T, DatabaseError>
where
F: FnOnce() -> Fut , Fut: Future<Output = Result<T, DatabaseError>>;
}
#[macro_export]
macro_rules! with_conn_scope {
($register:expr, $self:expr, $func:expr) => {{
use std::sync::Arc;
use tokio::sync::Mutex;
let conn = $self.get_conn().await?;
$register.scope(Arc::new(Mutex::new(conn)), async {{
$func().await
}}).await
}};
}