huiyu_db_mapper_sqlite/sqlite/
sqlite_executor.rs1use deadpool_sqlite::{Object, Pool};
2use huiyu_db_mapper_core::base::error::DatabaseError;
3use huiyu_db_mapper_core::base::param::ParamValue;
4use huiyu_db_mapper_core::pool::datasource::get_datasource_name;
5use huiyu_db_mapper_core::pool::db_manager::DbManager;
6use huiyu_db_mapper_core::sql::executor::{Executor, RowType};
7use huiyu_db_mapper_core::with_conn_scope;
8use rusqlite::types::ValueRef;
9use rusqlite::ToSql;
10use std::sync::Arc;
11use tokio::sync::Mutex;
12use tokio::task_local;
13use tracing::{trace};
14
15task_local! {
16 pub static SQLITE_CONN_REGISTER : Arc<Mutex<Object>>;
17}
18#[derive(Clone)]
19pub struct SqliteSqlExecutor;
20pub const SQLITE_SQL_EXECUTOR: SqliteSqlExecutor = SqliteSqlExecutor;
22
23pub struct SqliteRow<'a>(&'a rusqlite::Row<'a>);
24
25impl<'a> RowType for SqliteRow<'a> {
26 fn col_to_v_by_index(&self, col_index: usize) -> Result<ParamValue, DatabaseError>
27 where
28 Self: Sized
29 {
30 let val = self.0.get_ref(col_index);if val.is_err(){
32 trace!("Fail to get Column {}",val.as_ref().err().unwrap());
33 return Ok(ParamValue::Null);
34 }
35 Ok(value_to_param_value(val.unwrap())?)
36 }
37
38 fn col_to_v_by_name(&self, col_name: &str) -> Result<ParamValue, DatabaseError>
39 where
40 Self: Sized
41 {
42 let val = self.0.get_ref(col_name);if val.is_err() {
44 trace!("Fail to get Column {}",val.as_ref().err().unwrap());
45 return Ok(ParamValue::Null)
46 }
47 Ok(value_to_param_value(val.unwrap())?)
48 }
49}
50impl Executor for SqliteSqlExecutor {
52 type Row<'a> = SqliteRow<'a>;
53 type Conn = Object;
54
55
56 async fn query<T, R, F, Q>(&self, conn: Arc<Mutex<Self::Conn>>, sql: &str, params: &Vec<ParamValue>, mapper: F, processor: Q) -> Result<R, DatabaseError>
57 where
58 T: Send + 'static,
59 R: Send + 'static,
60 F: for<'a> Fn(&Self::Row<'a>) -> Result<T, DatabaseError> + Send + 'static,
61 Q: FnOnce(Vec<T>) -> Result<R, DatabaseError> + Send + 'static
62 {
63 let sql = sql.to_string();
64 let params = params.clone();
65 let conn = conn.lock().await;
66 conn.interact(move |conn| {
67 let mut stmt = conn.prepare(sql.as_str()).map_err(|e| DatabaseError::ExecuteError(format!("Failed to prepare statement: {:?}", e)))?;
68 let param_refs = ParamValueWrapper::convert_param_values(¶ms)?;
69 let to_sql_values = param_refs.iter().map(|x| x.as_sql_param()).collect::<Result<Vec<_>, DatabaseError>>()?;
70
71 let mut rows = stmt.query(&*to_sql_values).map_err(|e| DatabaseError::ExecuteError(format!("Failed to execute query: {:?}", e)))?;
72 let mut results = Vec::new();
73
74 while let Some(row) = rows.next().map_err(|e| DatabaseError::RowConvertError(format!("Failed to fetch row: {:?}", e)))? {
75 results.push(mapper(&SqliteRow(row)).map_err(|e| DatabaseError::RowConvertError(format!("Failed to map row: {:?}", e)))?);
76 }
77
78 processor(results)
79 }).await.map_err(|e| DatabaseError::ExecuteError(format!("Database interaction failed: {:?}", e)))?
80 }
81
82 async fn execute(&self, conn: Arc<Mutex<Self::Conn>>, sql: &str, params: &Vec<ParamValue>) -> Result<u64, DatabaseError> {
83 let sql = sql.to_string();
84 let params = params.clone();
85 let conn = conn.lock().await;
86 conn.interact(move |conn| {
87 let param_refs = ParamValueWrapper::convert_param_values(¶ms)?;
88 let to_sql_values = param_refs.iter().map(|x| x.as_sql_param()).collect::<Result<Vec<_>, DatabaseError>>()?;
89 let res = conn.execute(sql.as_str(), &*to_sql_values).map_err(|e| DatabaseError::ExecuteError(format!("Failed to execute statement: {:?}", e)))?;
90 Ok(res as u64)
91 }).await.map_err(|e| DatabaseError::ExecuteError(format!("Database interaction failed: {:?}", e)))?
92 }
93
94 fn get_conn_ref(&self) -> Result<Arc<Mutex<Self::Conn>>, DatabaseError> {
95 let c = SQLITE_CONN_REGISTER.try_get();
96 if c.is_err() {
97 return Err(DatabaseError::AccessError("SQLITE_CONN_REGISTER is not set".to_string()));
98 }
99 Ok(c.unwrap())
100 }
101
102 async fn get_conn(&self) -> Result<Self::Conn,DatabaseError> {
103 let p:Arc<DbManager<Pool>> = DbManager::get_instance(get_datasource_name().as_str())?;
104 let conn = p.get_pool().get().await.map_err(|e| DatabaseError::ConnectCanNotGetError(format!("Failed to get database connection: {:?}", e)))?;
105 Ok(conn)
106 }
107
108 async fn start_transaction(&self) -> Result<(), DatabaseError> {
109 let conn = self.get_conn_ref()?;
110 let conn = conn.lock().await;
111 conn.interact(move |conn| {
112 conn.execute("BEGIN TRANSACTION", []).map_err(|e| DatabaseError::ExecuteError(format!("Failed to start transaction: {:?}", e)))
113 }).await.map_err(|e| DatabaseError::AccessError(format!("Failed to lock database connection: {:?}", e)))??;
114 Ok(())
115 }
116
117 async fn commit(&self) -> Result<(), DatabaseError> {
118 let conn = self.get_conn_ref()?;
119 let conn = conn.lock().await;
120 conn.interact(move |conn| {
121 conn.execute("COMMIT", []).map_err(|e| DatabaseError::ExecuteError(format!("Failed to start transaction: {:?}", e)))
122 }).await.map_err(|e| DatabaseError::AccessError(format!("Failed to lock database connection: {:?}", e)))??;
123 Ok(())
124 }
125
126 async fn rollback(&self) -> Result<(), DatabaseError> {
127 let conn = self.get_conn_ref()?;
128 let conn = conn.lock().await;
129 conn.interact(move |conn| {
130 conn.execute("ROLLBACK", []).map_err(|e| DatabaseError::ExecuteError(format!("Failed to rollback transaction: {:?}", e)))
131 }).await.map_err(|e| DatabaseError::AccessError(format!("Failed to lock database connection: {:?}", e)))??;
132 Ok(())
133 }
134
135 async fn transaction_basic_exec<F, T, Fut>(&self, func: F) -> Result<T, DatabaseError>
136 where
137 F: FnOnce() -> Fut,
138 Fut: Future<Output=Result<T, DatabaseError>>
139 {
140 with_conn_scope!(SQLITE_CONN_REGISTER, self, func)
141 }
142
143 }
156
157struct ParamValueWrapper(ParamValue);
158
159impl ParamValueWrapper {
160
161 fn convert_param_values(param_values: &Vec<ParamValue>) -> Result<Vec<ParamValueWrapper>,DatabaseError> {
162 param_values.iter().map(|param_value: &ParamValue|{
163 match param_value {
164 ParamValue::U64(x) => Ok(ParamValueWrapper(ParamValue::I64(*x as i64))),
165 ParamValue::U32(x) => Ok(ParamValueWrapper(ParamValue::U32(*x ))),
166 ParamValue::U16(x) => Ok(ParamValueWrapper(ParamValue::U16(*x))),
167 ParamValue::U8(x) => Ok(ParamValueWrapper(ParamValue::U8(*x))) ,
168 ParamValue::I64(x) => Ok(ParamValueWrapper(ParamValue::I64(*x))),
169 ParamValue::I32(x) => Ok(ParamValueWrapper(ParamValue::I32(*x))),
170 ParamValue::I16(x) => Ok(ParamValueWrapper(ParamValue::I16(*x))),
171 ParamValue::I8(x) => Ok(ParamValueWrapper(ParamValue::I8(*x))),
172 ParamValue::String(x) => Ok(ParamValueWrapper(ParamValue::String(x.to_string()))),
173 ParamValue::F32(x) => Ok(ParamValueWrapper(ParamValue::F32(*x))),
174 ParamValue::F64(x) => Ok(ParamValueWrapper(ParamValue::F64(*x))),
175 ParamValue::Bool(x) => Ok(ParamValueWrapper(ParamValue::Bool(*x))),
176 ParamValue::Blob(x) => Ok(ParamValueWrapper(ParamValue::Blob(x.to_vec()))),
177 ParamValue::Clob(x) => Ok(ParamValueWrapper(ParamValue::String(String::from_utf8(x.to_vec()).unwrap()))),
178 ParamValue::DateTime(x) => Ok(ParamValueWrapper(ParamValue::I64(x.timestamp()))),
179 _ => Err(DatabaseError::ConvertError(format!("Can't Convert Postgres Error: {:?}", param_value)))
180 }
181 }).collect()
182 }
183 fn as_sql_param(&self) -> Result<&dyn ToSql, DatabaseError> {
184 match &self.0 {
185 ParamValue::Null => Ok(&rusqlite::types::Null),
186 ParamValue::I64(v) => Ok(v),
187 ParamValue::I32(v) => Ok(v),
188 ParamValue::I16(v) => Ok(v),
189 ParamValue::I8(v) => Ok(v),
190 ParamValue::String(v) => Ok(v ),
191 ParamValue::F64(v) => Ok(v ) ,
192 ParamValue::F32(v) => Ok(v),
193 ParamValue::Bool(v) => Ok(v),
194 ParamValue::Blob(v) => Ok(v),
195 ParamValue::Clob(v) => Ok(v),
196 ParamValue::U32(v) => Ok(v),
197 ParamValue::U16(v) => Ok(v),
198 ParamValue::U8(v) => Ok(v),
199 _ => Err(DatabaseError::ConvertError(format!("Can't Convert Sqlite Error: {:?}", self.0)))
200 }
201 }
202
203}
204
205
206fn value_to_param_value(value: ValueRef<'_>) -> Result<ParamValue, DatabaseError> {
207 let param_value;
208 match value {
209 ValueRef::Null => param_value = ParamValue::Null,
210 ValueRef::Integer(v) => param_value = ParamValue::I64(v),
211 ValueRef::Real(v) => param_value = ParamValue::F64(v),
212 ValueRef::Text(v) => {
213 let s = String::from_utf8(v.to_vec());
214 match s {
215 Ok(s) => param_value = ParamValue::String(s),
216 Err(e) => {
217 return Err(DatabaseError::CommonError(format!("字符串转换异常: {}", e)));
218 }
219 }
220 }
221 ValueRef::Blob(v) => param_value = ParamValue::Blob(v.to_vec()),
222 }
223 Ok(param_value)
224}