Skip to main content

huiyu_db_mapper_sqlite/sqlite/
sqlite_executor.rs

1use deadpool_sqlite::{Object, Pool};
2use huiyu_db_mapper_core::base::error::DatabaseError;
3use huiyu_db_mapper_core::base::param::ParamValue;
4use huiyu_db_mapper_core::pool::datasource::get_datasource_name;
5use huiyu_db_mapper_core::pool::db_manager::DbManager;
6use huiyu_db_mapper_core::sql::executor::{Executor, RowType};
7use huiyu_db_mapper_core::with_conn_scope;
8use rusqlite::types::ValueRef;
9use rusqlite::ToSql;
10use std::sync::Arc;
11use tokio::sync::Mutex;
12use tokio::task_local;
13use tracing::{trace};
14
15task_local! {
16    pub static SQLITE_CONN_REGISTER : Arc<Mutex<Object>>;
17}
18#[derive(Clone)]
19pub struct SqliteSqlExecutor;
20// 全局单例
21pub const SQLITE_SQL_EXECUTOR: SqliteSqlExecutor = SqliteSqlExecutor;
22
23pub struct SqliteRow<'a>(&'a rusqlite::Row<'a>);
24
25impl<'a> RowType for SqliteRow<'a> {
26    fn col_to_v_by_index(&self, col_index: usize) -> Result<ParamValue, DatabaseError>
27    where
28        Self: Sized
29    {
30        let val = self.0.get_ref(col_index);//.map_err(|e| DatabaseError::CommonError(format!("Failed to get column value: {:?}", e)))?;
31        if val.is_err(){
32            trace!("Fail to get Column {}",val.as_ref().err().unwrap());
33            return Ok(ParamValue::Null);
34        }
35        Ok(value_to_param_value(val.unwrap())?)
36    }
37
38    fn col_to_v_by_name(&self, col_name: &str) -> Result<ParamValue, DatabaseError>
39    where
40        Self: Sized
41    {
42        let val = self.0.get_ref(col_name);//.map_err(|e| DatabaseError::CommonError(format!("Failed to get column value: {:?}", e)))?;
43        if val.is_err() {
44            trace!("Fail to get Column {}",val.as_ref().err().unwrap());
45            return Ok(ParamValue::Null)
46        }
47        Ok(value_to_param_value(val.unwrap())?)
48    }
49}
50// 查询基本实现
51impl Executor for SqliteSqlExecutor {
52    type Row<'a> = SqliteRow<'a>;
53    type Conn = Object;
54
55
56    async fn query<T, R, F, Q>(&self, conn: Arc<Mutex<Self::Conn>>, sql: &str, params: &Vec<ParamValue>, mapper: F, processor: Q) -> Result<R, DatabaseError>
57    where
58        T: Send + 'static,
59        R: Send + 'static,
60        F: for<'a> Fn(&Self::Row<'a>) -> Result<T, DatabaseError> + Send + 'static,
61        Q: FnOnce(Vec<T>) -> Result<R, DatabaseError> + Send + 'static
62    {
63        let sql = sql.to_string();
64        let params = params.clone();
65        let conn = conn.lock().await;
66        conn.interact(move |conn| {
67            let mut stmt = conn.prepare(sql.as_str()).map_err(|e| DatabaseError::ExecuteError(format!("Failed to prepare statement: {:?}", e)))?;
68            let param_refs = ParamValueWrapper::convert_param_values(&params)?;
69            let to_sql_values = param_refs.iter().map(|x| x.as_sql_param()).collect::<Result<Vec<_>, DatabaseError>>()?;
70
71            let mut rows = stmt.query(&*to_sql_values).map_err(|e| DatabaseError::ExecuteError(format!("Failed to execute query: {:?}", e)))?;
72            let mut results = Vec::new();
73
74            while let Some(row) = rows.next().map_err(|e| DatabaseError::RowConvertError(format!("Failed to fetch row: {:?}", e)))? {
75                results.push(mapper(&SqliteRow(row)).map_err(|e| DatabaseError::RowConvertError(format!("Failed to map row: {:?}", e)))?);
76            }
77
78            processor(results)
79        }).await.map_err(|e| DatabaseError::ExecuteError(format!("Database interaction failed: {:?}", e)))?
80    }
81
82    async fn execute(&self, conn: Arc<Mutex<Self::Conn>>, sql: &str, params: &Vec<ParamValue>) -> Result<u64, DatabaseError> {
83        let sql = sql.to_string();
84        let params = params.clone();
85        let conn = conn.lock().await;
86        conn.interact(move |conn| {
87            let param_refs = ParamValueWrapper::convert_param_values(&params)?;
88            let to_sql_values = param_refs.iter().map(|x| x.as_sql_param()).collect::<Result<Vec<_>, DatabaseError>>()?;
89            let res = conn.execute(sql.as_str(), &*to_sql_values).map_err(|e| DatabaseError::ExecuteError(format!("Failed to execute statement: {:?}", e)))?;
90            Ok(res as u64)
91        }).await.map_err(|e| DatabaseError::ExecuteError(format!("Database interaction failed: {:?}", e)))?
92    }
93
94    fn get_conn_ref(&self) -> Result<Arc<Mutex<Self::Conn>>, DatabaseError> {
95        let c = SQLITE_CONN_REGISTER.try_get();
96        if c.is_err() {
97            return Err(DatabaseError::AccessError("SQLITE_CONN_REGISTER is not set".to_string()));
98        }
99        Ok(c.unwrap())
100    }
101
102    async fn get_conn(&self) -> Result<Self::Conn,DatabaseError> {
103        let p:Arc<DbManager<Pool>> = DbManager::get_instance(get_datasource_name().as_str())?;
104        let conn = p.get_pool().get().await.map_err(|e| DatabaseError::ConnectCanNotGetError(format!("Failed to get database connection: {:?}", e)))?;
105        Ok(conn)
106    }
107
108    async fn start_transaction(&self) -> Result<(), DatabaseError> {
109        let conn = self.get_conn_ref()?;
110        let conn = conn.lock().await;
111        conn.interact(move |conn| {
112            conn.execute("BEGIN TRANSACTION", []).map_err(|e| DatabaseError::ExecuteError(format!("Failed to start transaction: {:?}", e)))
113        }).await.map_err(|e| DatabaseError::AccessError(format!("Failed to lock database connection: {:?}", e)))??;
114        Ok(())
115    }
116
117    async fn commit(&self) -> Result<(), DatabaseError> {
118        let conn = self.get_conn_ref()?;
119        let conn = conn.lock().await;
120        conn.interact(move |conn| {
121            conn.execute("COMMIT", []).map_err(|e| DatabaseError::ExecuteError(format!("Failed to start transaction: {:?}", e)))
122        }).await.map_err(|e| DatabaseError::AccessError(format!("Failed to lock database connection: {:?}", e)))??;
123        Ok(())
124    }
125
126    async fn rollback(&self) -> Result<(), DatabaseError> {
127        let conn = self.get_conn_ref()?;
128        let conn = conn.lock().await;
129        conn.interact(move |conn| {
130            conn.execute("ROLLBACK", []).map_err(|e| DatabaseError::ExecuteError(format!("Failed to rollback transaction: {:?}", e)))
131        }).await.map_err(|e| DatabaseError::AccessError(format!("Failed to lock database connection: {:?}", e)))??;
132        Ok(())
133    }
134
135    async fn transaction_basic_exec<F, T, Fut>(&self, func: F) -> Result<T, DatabaseError>
136    where
137        F: FnOnce() -> Fut,
138        Fut: Future<Output=Result<T, DatabaseError>>
139    {
140        with_conn_scope!(SQLITE_CONN_REGISTER, self, func)
141    }
142
143    // async fn transactional_exec<F, T, Fut>(&self, func: F) -> Result<T, DatabaseError>
144    // where
145    //     F: FnOnce() -> Fut ,  // BF 返回 Future
146    //     Fut: Future<Output = Result<T, DatabaseError>>,
147    // {
148    //     // let conn = self.get_conn().await?;
149    //     // let res = SQLITE_CONN_REGISTER.scope(Arc::new(Mutex::new(conn)), async {
150    //     //     self.transaction_exec_basic(func).await
151    //     // }).await;
152    //     // res
153    //     with_conn_scope!(SQLITE_CONN_REGISTER, self, func)
154    // }
155}
156
157struct ParamValueWrapper(ParamValue);
158
159impl ParamValueWrapper {
160
161    fn convert_param_values(param_values: &Vec<ParamValue>) -> Result<Vec<ParamValueWrapper>,DatabaseError> {
162        param_values.iter().map(|param_value: &ParamValue|{
163            match param_value {
164                ParamValue::U64(x) => Ok(ParamValueWrapper(ParamValue::I64(*x as i64))),
165                ParamValue::U32(x) => Ok(ParamValueWrapper(ParamValue::U32(*x ))),
166                ParamValue::U16(x) => Ok(ParamValueWrapper(ParamValue::U16(*x))),
167                ParamValue::U8(x) => Ok(ParamValueWrapper(ParamValue::U8(*x)))        ,
168                ParamValue::I64(x) => Ok(ParamValueWrapper(ParamValue::I64(*x))),
169                ParamValue::I32(x) => Ok(ParamValueWrapper(ParamValue::I32(*x))),
170                ParamValue::I16(x) => Ok(ParamValueWrapper(ParamValue::I16(*x))),
171                ParamValue::I8(x) => Ok(ParamValueWrapper(ParamValue::I8(*x))),
172                ParamValue::String(x) => Ok(ParamValueWrapper(ParamValue::String(x.to_string()))),
173                ParamValue::F32(x) => Ok(ParamValueWrapper(ParamValue::F32(*x))),
174                ParamValue::F64(x) => Ok(ParamValueWrapper(ParamValue::F64(*x))),
175                ParamValue::Bool(x) => Ok(ParamValueWrapper(ParamValue::Bool(*x))),
176                ParamValue::Blob(x) => Ok(ParamValueWrapper(ParamValue::Blob(x.to_vec()))),
177                ParamValue::Clob(x) => Ok(ParamValueWrapper(ParamValue::String(String::from_utf8(x.to_vec()).unwrap()))),
178                ParamValue::DateTime(x) => Ok(ParamValueWrapper(ParamValue::I64(x.timestamp()))),
179                _ => Err(DatabaseError::ConvertError(format!("Can't Convert Postgres Error: {:?}", param_value)))
180            }
181        }).collect()
182    }
183    fn as_sql_param(&self) -> Result<&dyn ToSql, DatabaseError> {
184        match &self.0 {
185            ParamValue::Null => Ok(&rusqlite::types::Null),
186            ParamValue::I64(v) => Ok(v),
187            ParamValue::I32(v) => Ok(v),
188            ParamValue::I16(v) => Ok(v),
189            ParamValue::I8(v) => Ok(v),
190            ParamValue::String(v) => Ok(v ),
191            ParamValue::F64(v) => Ok(v )      ,
192            ParamValue::F32(v) => Ok(v),
193            ParamValue::Bool(v) => Ok(v),
194            ParamValue::Blob(v) => Ok(v),
195            ParamValue::Clob(v) => Ok(v),
196            ParamValue::U32(v) => Ok(v),
197            ParamValue::U16(v) => Ok(v),
198            ParamValue::U8(v) => Ok(v),
199            _ => Err(DatabaseError::ConvertError(format!("Can't Convert Sqlite Error: {:?}", self.0)))
200        }
201    }
202
203}
204
205
206fn value_to_param_value(value: ValueRef<'_>) -> Result<ParamValue, DatabaseError> {
207    let param_value;
208    match value {
209        ValueRef::Null => param_value = ParamValue::Null,
210        ValueRef::Integer(v) => param_value = ParamValue::I64(v),
211        ValueRef::Real(v) => param_value = ParamValue::F64(v),
212        ValueRef::Text(v) => {
213            let s = String::from_utf8(v.to_vec());
214            match s {
215                Ok(s) => param_value = ParamValue::String(s),
216                Err(e) => {
217                    return Err(DatabaseError::CommonError(format!("字符串转换异常: {}", e)));
218                }
219            }
220        }
221        ValueRef::Blob(v) => param_value = ParamValue::Blob(v.to_vec()),
222    }
223    Ok(param_value)
224}