sql_middleware/
sqlite.rs

1use deadpool_sqlite::rusqlite::ParamsFromIter;
2use deadpool_sqlite::Object;
3use deadpool_sqlite::{rusqlite, Config as DeadpoolSqliteConfig, Runtime};
4use rusqlite::types::Value;
5use rusqlite::Statement;
6use rusqlite::ToSql;
7
8use crate::middleware::{
9    ConfigAndPool, ConversionMode, CustomDbRow, DatabaseType, MiddlewarePool, ParamConverter,
10    ResultSet, RowValues, SqlMiddlewareDbError,
11};
12
13// influenced design: https://tedspence.com/investigating-rust-with-sqlite-53d1f9a41112, https://www.powersync.com/blog/sqlite-optimizations-for-ultra-high-performance
14
15impl ConfigAndPool {
16    /// Asynchronous initializer for ConfigAndPool with Sqlite using deadpool_sqlite
17    pub async fn new_sqlite(db_path: String) -> Result<Self, SqlMiddlewareDbError> {
18        // Configure deadpool_sqlite
19        let cfg: DeadpoolSqliteConfig = DeadpoolSqliteConfig::new(db_path.clone());
20
21        // Create the pool
22        let pool = cfg.create_pool(Runtime::Tokio1)
23            .map_err(|e| SqlMiddlewareDbError::ConnectionError(
24                format!("Failed to create SQLite pool: {}", e)
25            ))?;
26
27        // Initialize the database (e.g., create tables)
28        {
29            let conn = pool
30                .get()
31                .await
32                .map_err(SqlMiddlewareDbError::PoolErrorSqlite)?;
33            let _res = conn
34                .interact(|conn| {
35                    conn.execute_batch(
36                        "
37                    PRAGMA journal_mode = WAL;
38                ",
39                    )
40                    .map_err(SqlMiddlewareDbError::SqliteError)
41                })
42                .await?;
43        }
44
45        Ok(ConfigAndPool {
46            pool: MiddlewarePool::Sqlite(pool),
47            db_type: DatabaseType::Sqlite,
48        })
49    }
50}
51
52// The #[from] attribute on the SqlMiddlewareDbError::SqliteError variant  
53// automatically generates this implementation
54
55/// Convert InteractError to a more specific SqlMiddlewareDbError
56impl From<deadpool_sqlite::InteractError> for SqlMiddlewareDbError {
57    fn from(err: deadpool_sqlite::InteractError) -> Self {
58        SqlMiddlewareDbError::ConnectionError(format!("SQLite Interact Error: {}", err))
59    }
60}
61
62/// Bind middleware params to SQLite types.
63pub fn convert_params(
64    params: &[RowValues],
65) -> Result<Vec<rusqlite::types::Value>, SqlMiddlewareDbError> {
66    let mut vec_values = Vec::with_capacity(params.len());
67    for p in params {
68        let v = match p {
69            RowValues::Int(i) => rusqlite::types::Value::Integer(*i),
70            RowValues::Float(f) => rusqlite::types::Value::Real(*f),
71            RowValues::Text(s) => rusqlite::types::Value::Text(s.clone()), // We need to clone but we use clone() explicitly
72            RowValues::Bool(b) => rusqlite::types::Value::Integer(*b as i64),
73            RowValues::Timestamp(dt) => {
74                let formatted = dt.format("%F %T%.f").to_string(); // Conversion needs to allocate
75                rusqlite::types::Value::Text(formatted)
76            }
77            RowValues::Null => rusqlite::types::Value::Null,
78            RowValues::JSON(jval) => {
79                // Only serialize once to avoid multiple allocations
80                let json_str = jval.to_string();
81                rusqlite::types::Value::Text(json_str)
82            },
83            RowValues::Blob(bytes) => {
84                // Only clone if we need to - this is unavoidable with rusqlite::Value
85                rusqlite::types::Value::Blob(bytes.to_vec())
86            },
87        };
88        vec_values.push(v);
89    }
90    Ok(vec_values)
91}
92
93/// Convert parameters for execution operations
94pub fn convert_params_for_execute<I>(
95    iter: I,
96) -> Result<ParamsFromIter<std::vec::IntoIter<Value>>, SqlMiddlewareDbError>
97where
98    I: IntoIterator<Item = RowValues>,
99{
100    let params_vec: Vec<RowValues> = iter.into_iter().collect();
101    let x = convert_params(&params_vec)?;
102    Ok(rusqlite::params_from_iter(x.into_iter()))
103}
104
105/// Wrapper for SQLite parameters for queries.
106pub struct SqliteParamsQuery(pub Vec<rusqlite::types::Value>);
107
108/// Wrapper for SQLite parameters for execution.
109pub struct SqliteParamsExecute(
110    pub rusqlite::ParamsFromIter<std::vec::IntoIter<rusqlite::types::Value>>,
111);
112
113impl<'a> ParamConverter<'a> for SqliteParamsQuery {
114    type Converted = Self;
115
116    fn convert_sql_params(
117        params: &[RowValues],
118        mode: ConversionMode,
119    ) -> Result<Self::Converted, SqlMiddlewareDbError> {
120        match mode {
121            // For a query, use the conversion that returns a Vec<Value>
122            ConversionMode::Query => convert_params(params).map(SqliteParamsQuery),
123            // Or, if you really want to support execution mode with this type, you might decide how to handle it:
124            ConversionMode::Execute => {
125                // For example, you could also call the "query" conversion here or return an error.
126                convert_params(params).map(SqliteParamsQuery)
127            }
128        }
129    }
130    
131    fn supports_mode(mode: ConversionMode) -> bool {
132        // This converter is primarily for query operations
133        mode == ConversionMode::Query
134    }
135}
136
137impl<'a> ParamConverter<'a> for SqliteParamsExecute {
138    type Converted = Self;
139
140    fn convert_sql_params(
141        params: &[RowValues],
142        mode: ConversionMode,
143    ) -> Result<Self::Converted, SqlMiddlewareDbError> {
144        match mode {
145            ConversionMode::Execute => {
146                convert_params_for_execute(params.to_vec()).map(SqliteParamsExecute)
147            }
148            // For queries you might not support the "execute" wrapper:
149            ConversionMode::Query => Err(SqlMiddlewareDbError::ParameterError(
150                "SqliteParamsExecute can only be used with Execute mode".into(),
151            )),
152        }
153    }
154    
155    fn supports_mode(mode: ConversionMode) -> bool {
156        // This converter is only for execution operations
157        mode == ConversionMode::Execute
158    }
159}
160
161/// Extract a RowValues from a SQLite row
162fn sqlite_extract_value_sync(
163    row: &rusqlite::Row,
164    idx: usize,
165) -> Result<RowValues, SqlMiddlewareDbError> {
166    let val_ref_res = row.get_ref(idx);
167    match val_ref_res {
168        Err(e) => Err(SqlMiddlewareDbError::SqliteError(e)),
169        Ok(rusqlite::types::ValueRef::Null) => Ok(RowValues::Null),
170        Ok(rusqlite::types::ValueRef::Integer(i)) => Ok(RowValues::Int(i)),
171        Ok(rusqlite::types::ValueRef::Real(f)) => Ok(RowValues::Float(f)),
172        Ok(rusqlite::types::ValueRef::Text(bytes)) => {
173            let s = String::from_utf8_lossy(bytes).into_owned();
174            Ok(RowValues::Text(s))
175        }
176        Ok(rusqlite::types::ValueRef::Blob(b)) => Ok(RowValues::Blob(b.to_vec())),
177    }
178}
179
180/// Build a result set from a SQLite query
181/// Only SELECT queries return rows affected. If a DML is sent, it does run it.
182/// If there's more than one query in the statement, idk which statement will be run.
183pub fn build_result_set(
184    stmt: &mut Statement,
185    params: &[Value],
186) -> Result<ResultSet, SqlMiddlewareDbError> {
187    let param_refs: Vec<&dyn ToSql> = params.iter().map(|v| v as &dyn ToSql).collect();
188    let column_names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
189    
190    // Store column names once in the result set
191    let column_names_rc = std::sync::Arc::new(column_names);
192
193    let mut rows_iter = stmt.query(&param_refs[..])?;
194    // Create result set with default capacity
195    let mut result_set = ResultSet::with_capacity(10);
196
197    while let Some(row) = rows_iter.next()? {
198        let mut row_values = Vec::new();
199
200        for i in 0..column_names_rc.len() {
201            let value = sqlite_extract_value_sync(row, i)?;
202            row_values.push(value);
203        }
204
205        result_set.add_row(CustomDbRow::new(column_names_rc.clone(), row_values));
206    }
207
208    Ok(result_set)
209}
210
211/// Execute a batch of SQL statements for SQLite
212pub async fn execute_batch(
213    sqlite_client: &Object,
214    query: &str,
215) -> Result<(), SqlMiddlewareDbError> {
216    let query_owned = query.to_owned();
217
218    // Use interact to run the blocking code in a separate thread.
219    sqlite_client
220        .interact(move |conn| -> rusqlite::Result<()> {
221            // Begin a transaction
222            let tx = conn.transaction()?;
223
224            // Execute the batch of queries
225            tx.execute_batch(&query_owned)?;
226
227            // Commit the transaction
228            tx.commit()?;
229
230            Ok(())
231        })
232        .await
233        .map_err(|e| SqlMiddlewareDbError::ConnectionError(format!("Interact error: {}", e)))
234        .and_then(|res| res.map_err(SqlMiddlewareDbError::SqliteError))
235}
236
237/// Execute a SELECT query in SQLite
238pub async fn execute_select(
239    sqlite_client: &Object,
240    query: &str,
241    params: &[RowValues],
242) -> Result<ResultSet, SqlMiddlewareDbError> {
243    let query_owned = query.to_owned();
244    let params_owned = convert_params(params)?;
245
246    // Use interact to run the blocking code in a separate thread.
247    let result = sqlite_client
248        .interact(move |conn| {
249            // Prepare the query
250            let mut stmt = conn.prepare(&query_owned)?;
251
252            // Execute the query
253            build_result_set(&mut stmt, &params_owned)
254        })
255        .await
256        .map_err(|e| SqlMiddlewareDbError::ConnectionError(format!("Interact error: {}", e)))?;
257
258    result
259}
260
261/// Execute a DML query (INSERT, UPDATE, DELETE) in SQLite
262pub async fn execute_dml(
263    sqlite_client: &Object,
264    query: &str,
265    params: &[RowValues],
266) -> Result<usize, SqlMiddlewareDbError> {
267    let query_owned = query.to_owned();
268    let params_owned = convert_params(params)?;
269
270    // Use interact to run the blocking code in a separate thread.
271    sqlite_client
272        .interact(move |conn| -> rusqlite::Result<usize> {
273            // Prepare the query
274            let tx = conn.transaction()?;
275            // Execute the query
276            let param_refs: Vec<&dyn ToSql> =
277                params_owned.iter().map(|v| v as &dyn ToSql).collect();
278            let rows = {
279                let mut stmt = tx.prepare(&query_owned)?;
280                stmt.execute(&param_refs[..])?
281            };
282            tx.commit()?;
283
284            Ok(rows)
285        })
286        .await
287        .map_err(|e| SqlMiddlewareDbError::ConnectionError(format!("Interact error: {}", e)))
288        .and_then(|res| res.map_err(SqlMiddlewareDbError::SqliteError))
289}