use deadpool_sqlite::{Object, Pool};
use huiyu_db_mapper_core::base::error::DatabaseError;
use huiyu_db_mapper_core::base::param::ParamValue;
use huiyu_db_mapper_core::pool::datasource::get_datasource_name;
use huiyu_db_mapper_core::pool::db_manager::DbManager;
use huiyu_db_mapper_core::sql::executor::{Executor, RowType};
use huiyu_db_mapper_core::with_conn_scope;
use rusqlite::types::ValueRef;
use rusqlite::ToSql;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task_local;
use tracing::{trace};
task_local! {
pub static SQLITE_CONN_REGISTER : Arc<Mutex<Object>>;
}
#[derive(Clone)]
pub struct SqliteSqlExecutor;
pub const SQLITE_SQL_EXECUTOR: SqliteSqlExecutor = SqliteSqlExecutor;
pub struct SqliteRow<'a>(&'a rusqlite::Row<'a>);
impl<'a> RowType for SqliteRow<'a> {
fn col_to_v_by_index(&self, col_index: usize) -> Result<ParamValue, DatabaseError>
where
Self: Sized
{
let val = self.0.get_ref(col_index); if val.is_err(){
trace!("Fail to get Column {}",val.as_ref().err().unwrap());
return Ok(ParamValue::Null);
}
Ok(value_to_param_value(val.unwrap())?)
}
fn col_to_v_by_name(&self, col_name: &str) -> Result<ParamValue, DatabaseError>
where
Self: Sized
{
let val = self.0.get_ref(col_name); if val.is_err() {
trace!("Fail to get Column {}",val.as_ref().err().unwrap());
return Ok(ParamValue::Null)
}
Ok(value_to_param_value(val.unwrap())?)
}
}
impl Executor for SqliteSqlExecutor {
type Row<'a> = SqliteRow<'a>;
type Conn = Object;
async fn query<T, R, F, Q>(&self, conn: Arc<Mutex<Self::Conn>>, sql: &str, params: &Vec<ParamValue>, mapper: F, processor: Q) -> Result<R, DatabaseError>
where
T: Send + 'static,
R: Send + 'static,
F: for<'a> Fn(&Self::Row<'a>) -> Result<T, DatabaseError> + Send + 'static,
Q: FnOnce(Vec<T>) -> Result<R, DatabaseError> + Send + 'static
{
let sql = sql.to_string();
let params = params.clone();
let conn = conn.lock().await;
conn.interact(move |conn| {
let mut stmt = conn.prepare(sql.as_str()).map_err(|e| DatabaseError::ExecuteError(format!("Failed to prepare statement: {:?}", e)))?;
let param_refs = ParamValueWrapper::convert_param_values(¶ms)?;
let to_sql_values = param_refs.iter().map(|x| x.as_sql_param()).collect::<Result<Vec<_>, DatabaseError>>()?;
let mut rows = stmt.query(&*to_sql_values).map_err(|e| DatabaseError::ExecuteError(format!("Failed to execute query: {:?}", e)))?;
let mut results = Vec::new();
while let Some(row) = rows.next().map_err(|e| DatabaseError::RowConvertError(format!("Failed to fetch row: {:?}", e)))? {
results.push(mapper(&SqliteRow(row)).map_err(|e| DatabaseError::RowConvertError(format!("Failed to map row: {:?}", e)))?);
}
processor(results)
}).await.map_err(|e| DatabaseError::ExecuteError(format!("Database interaction failed: {:?}", e)))?
}
async fn execute(&self, conn: Arc<Mutex<Self::Conn>>, sql: &str, params: &Vec<ParamValue>) -> Result<u64, DatabaseError> {
let sql = sql.to_string();
let params = params.clone();
let conn = conn.lock().await;
conn.interact(move |conn| {
let param_refs = ParamValueWrapper::convert_param_values(¶ms)?;
let to_sql_values = param_refs.iter().map(|x| x.as_sql_param()).collect::<Result<Vec<_>, DatabaseError>>()?;
let res = conn.execute(sql.as_str(), &*to_sql_values).map_err(|e| DatabaseError::ExecuteError(format!("Failed to execute statement: {:?}", e)))?;
Ok(res as u64)
}).await.map_err(|e| DatabaseError::ExecuteError(format!("Database interaction failed: {:?}", e)))?
}
fn get_conn_ref(&self) -> Result<Arc<Mutex<Self::Conn>>, DatabaseError> {
let c = SQLITE_CONN_REGISTER.try_get();
if c.is_err() {
return Err(DatabaseError::AccessError("SQLITE_CONN_REGISTER is not set".to_string()));
}
Ok(c.unwrap())
}
async fn get_conn(&self) -> Result<Self::Conn,DatabaseError> {
let p:Arc<DbManager<Pool>> = DbManager::get_instance(get_datasource_name().as_str())?;
let conn = p.get_pool().get().await.map_err(|e| DatabaseError::ConnectCanNotGetError(format!("Failed to get database connection: {:?}", e)))?;
Ok(conn)
}
async fn start_transaction(&self) -> Result<(), DatabaseError> {
let conn = self.get_conn_ref()?;
let conn = conn.lock().await;
conn.interact(move |conn| {
conn.execute("BEGIN TRANSACTION", []).map_err(|e| DatabaseError::ExecuteError(format!("Failed to start transaction: {:?}", e)))
}).await.map_err(|e| DatabaseError::AccessError(format!("Failed to lock database connection: {:?}", e)))??;
Ok(())
}
async fn commit(&self) -> Result<(), DatabaseError> {
let conn = self.get_conn_ref()?;
let conn = conn.lock().await;
conn.interact(move |conn| {
conn.execute("COMMIT", []).map_err(|e| DatabaseError::ExecuteError(format!("Failed to start transaction: {:?}", e)))
}).await.map_err(|e| DatabaseError::AccessError(format!("Failed to lock database connection: {:?}", e)))??;
Ok(())
}
async fn rollback(&self) -> Result<(), DatabaseError> {
let conn = self.get_conn_ref()?;
let conn = conn.lock().await;
conn.interact(move |conn| {
conn.execute("ROLLBACK", []).map_err(|e| DatabaseError::ExecuteError(format!("Failed to rollback transaction: {:?}", e)))
}).await.map_err(|e| DatabaseError::AccessError(format!("Failed to lock database connection: {:?}", e)))??;
Ok(())
}
async fn transaction_basic_exec<F, T, Fut>(&self, func: F) -> Result<T, DatabaseError>
where
F: FnOnce() -> Fut,
Fut: Future<Output=Result<T, DatabaseError>>
{
with_conn_scope!(SQLITE_CONN_REGISTER, self, func)
}
}
struct ParamValueWrapper(ParamValue);
impl ParamValueWrapper {
fn convert_param_values(param_values: &Vec<ParamValue>) -> Result<Vec<ParamValueWrapper>,DatabaseError> {
param_values.iter().map(|param_value: &ParamValue|{
match param_value {
ParamValue::U64(x) => Ok(ParamValueWrapper(ParamValue::I64(*x as i64))),
ParamValue::U32(x) => Ok(ParamValueWrapper(ParamValue::U32(*x ))),
ParamValue::U16(x) => Ok(ParamValueWrapper(ParamValue::U16(*x))),
ParamValue::U8(x) => Ok(ParamValueWrapper(ParamValue::U8(*x))) ,
ParamValue::I64(x) => Ok(ParamValueWrapper(ParamValue::I64(*x))),
ParamValue::I32(x) => Ok(ParamValueWrapper(ParamValue::I32(*x))),
ParamValue::I16(x) => Ok(ParamValueWrapper(ParamValue::I16(*x))),
ParamValue::I8(x) => Ok(ParamValueWrapper(ParamValue::I8(*x))),
ParamValue::String(x) => Ok(ParamValueWrapper(ParamValue::String(x.to_string()))),
ParamValue::F32(x) => Ok(ParamValueWrapper(ParamValue::F32(*x))),
ParamValue::F64(x) => Ok(ParamValueWrapper(ParamValue::F64(*x))),
ParamValue::Bool(x) => Ok(ParamValueWrapper(ParamValue::Bool(*x))),
ParamValue::Blob(x) => Ok(ParamValueWrapper(ParamValue::Blob(x.to_vec()))),
ParamValue::Clob(x) => Ok(ParamValueWrapper(ParamValue::String(String::from_utf8(x.to_vec()).unwrap()))),
ParamValue::DateTime(x) => Ok(ParamValueWrapper(ParamValue::I64(x.timestamp()))),
_ => Err(DatabaseError::ConvertError(format!("Can't Convert Postgres Error: {:?}", param_value)))
}
}).collect()
}
fn as_sql_param(&self) -> Result<&dyn ToSql, DatabaseError> {
match &self.0 {
ParamValue::Null => Ok(&rusqlite::types::Null),
ParamValue::I64(v) => Ok(v),
ParamValue::I32(v) => Ok(v),
ParamValue::I16(v) => Ok(v),
ParamValue::I8(v) => Ok(v),
ParamValue::String(v) => Ok(v ),
ParamValue::F64(v) => Ok(v ) ,
ParamValue::F32(v) => Ok(v),
ParamValue::Bool(v) => Ok(v),
ParamValue::Blob(v) => Ok(v),
ParamValue::Clob(v) => Ok(v),
ParamValue::U32(v) => Ok(v),
ParamValue::U16(v) => Ok(v),
ParamValue::U8(v) => Ok(v),
_ => Err(DatabaseError::ConvertError(format!("Can't Convert Sqlite Error: {:?}", self.0)))
}
}
}
fn value_to_param_value(value: ValueRef<'_>) -> Result<ParamValue, DatabaseError> {
let param_value;
match value {
ValueRef::Null => param_value = ParamValue::Null,
ValueRef::Integer(v) => param_value = ParamValue::I64(v),
ValueRef::Real(v) => param_value = ParamValue::F64(v),
ValueRef::Text(v) => {
let s = String::from_utf8(v.to_vec());
match s {
Ok(s) => param_value = ParamValue::String(s),
Err(e) => {
return Err(DatabaseError::CommonError(format!("字符串转换异常: {}", e)));
}
}
}
ValueRef::Blob(v) => param_value = ParamValue::Blob(v.to_vec()),
}
Ok(param_value)
}