prax_sqlite/
engine.rs

1//! SQLite query engine implementation.
2
3use std::collections::HashMap;
4
5use rusqlite::types::Value;
6use serde_json::Value as JsonValue;
7use tracing::{debug, instrument};
8
9use prax_query::filter::FilterValue;
10use prax_query::types::SortOrder;
11
12use crate::error::SqliteError;
13use crate::pool::SqlitePool;
14use crate::types::filter_value_to_sqlite;
15
16/// SQLite query engine.
17#[derive(Clone)]
18pub struct SqliteEngine {
19    pool: SqlitePool,
20}
21
22/// Result of a query operation.
23#[derive(Debug, Clone)]
24pub struct SqliteQueryResult {
25    /// The result data as JSON.
26    pub data: JsonValue,
27}
28
29impl SqliteQueryResult {
30    /// Create a new query result.
31    pub fn new(data: JsonValue) -> Self {
32        Self { data }
33    }
34
35    /// Get the result as JSON.
36    pub fn json(&self) -> &JsonValue {
37        &self.data
38    }
39
40    /// Convert to the inner JSON value.
41    pub fn into_json(self) -> JsonValue {
42        self.data
43    }
44}
45
46impl SqliteEngine {
47    /// Create a new SQLite engine with the given pool.
48    pub fn new(pool: SqlitePool) -> Self {
49        Self { pool }
50    }
51
52    /// Get a reference to the connection pool.
53    pub fn pool(&self) -> &SqlitePool {
54        &self.pool
55    }
56
57    /// Build a SELECT query.
58    fn build_select(
59        &self,
60        table: &str,
61        columns: &[String],
62        filters: &HashMap<String, FilterValue>,
63        sort: &[(String, SortOrder)],
64        limit: Option<u64>,
65        offset: Option<u64>,
66    ) -> (String, Vec<Value>) {
67        let mut sql = String::new();
68        let mut params: Vec<Value> = Vec::new();
69
70        // SELECT clause
71        let cols = if columns.is_empty() {
72            "*".to_string()
73        } else {
74            columns
75                .iter()
76                .map(|c| format!("\"{}\"", c))
77                .collect::<Vec<_>>()
78                .join(", ")
79        };
80        sql.push_str(&format!("SELECT {} FROM \"{}\"", cols, table));
81
82        // WHERE clause
83        if !filters.is_empty() {
84            let mut conditions = Vec::new();
85            for (field, value) in filters {
86                match value {
87                    FilterValue::Null => {
88                        conditions.push(format!("\"{}\" IS NULL", field));
89                    }
90                    _ => {
91                        conditions.push(format!("\"{}\" = ?", field));
92                        params.push(filter_value_to_sqlite(value));
93                    }
94                }
95            }
96            sql.push_str(" WHERE ");
97            sql.push_str(&conditions.join(" AND "));
98        }
99
100        // ORDER BY clause
101        if !sort.is_empty() {
102            let order_parts: Vec<String> = sort
103                .iter()
104                .map(|(col, dir)| {
105                    let direction = match dir {
106                        SortOrder::Asc => "ASC",
107                        SortOrder::Desc => "DESC",
108                    };
109                    format!("\"{}\" {}", col, direction)
110                })
111                .collect();
112            sql.push_str(" ORDER BY ");
113            sql.push_str(&order_parts.join(", "));
114        }
115
116        // LIMIT and OFFSET
117        if let Some(lim) = limit {
118            sql.push_str(&format!(" LIMIT {}", lim));
119        }
120        if let Some(off) = offset {
121            sql.push_str(&format!(" OFFSET {}", off));
122        }
123
124        (sql, params)
125    }
126
127    /// Build an INSERT query.
128    fn build_insert(
129        &self,
130        table: &str,
131        data: &HashMap<String, FilterValue>,
132    ) -> (String, Vec<Value>) {
133        let mut columns = Vec::new();
134        let mut placeholders = Vec::new();
135        let mut params: Vec<Value> = Vec::new();
136
137        for (col, val) in data {
138            columns.push(format!("\"{}\"", col));
139            placeholders.push("?".to_string());
140            params.push(filter_value_to_sqlite(val));
141        }
142
143        let sql = format!(
144            "INSERT INTO \"{}\" ({}) VALUES ({})",
145            table,
146            columns.join(", "),
147            placeholders.join(", ")
148        );
149
150        (sql, params)
151    }
152
153    /// Build an UPDATE query.
154    fn build_update(
155        &self,
156        table: &str,
157        data: &HashMap<String, FilterValue>,
158        filters: &HashMap<String, FilterValue>,
159    ) -> (String, Vec<Value>) {
160        let mut params: Vec<Value> = Vec::new();
161
162        // SET clause
163        let set_parts: Vec<String> = data
164            .iter()
165            .map(|(col, val)| {
166                params.push(filter_value_to_sqlite(val));
167                format!("\"{}\" = ?", col)
168            })
169            .collect();
170
171        let mut sql = format!("UPDATE \"{}\" SET {}", table, set_parts.join(", "));
172
173        // WHERE clause
174        if !filters.is_empty() {
175            let mut conditions = Vec::new();
176            for (field, value) in filters {
177                match value {
178                    FilterValue::Null => {
179                        conditions.push(format!("\"{}\" IS NULL", field));
180                    }
181                    _ => {
182                        conditions.push(format!("\"{}\" = ?", field));
183                        params.push(filter_value_to_sqlite(value));
184                    }
185                }
186            }
187            sql.push_str(" WHERE ");
188            sql.push_str(&conditions.join(" AND "));
189        }
190
191        (sql, params)
192    }
193
194    /// Build a DELETE query.
195    fn build_delete(
196        &self,
197        table: &str,
198        filters: &HashMap<String, FilterValue>,
199    ) -> (String, Vec<Value>) {
200        let mut sql = format!("DELETE FROM \"{}\"", table);
201        let mut params: Vec<Value> = Vec::new();
202
203        if !filters.is_empty() {
204            let mut conditions = Vec::new();
205            for (field, value) in filters {
206                match value {
207                    FilterValue::Null => {
208                        conditions.push(format!("\"{}\" IS NULL", field));
209                    }
210                    _ => {
211                        conditions.push(format!("\"{}\" = ?", field));
212                        params.push(filter_value_to_sqlite(value));
213                    }
214                }
215            }
216            sql.push_str(" WHERE ");
217            sql.push_str(&conditions.join(" AND "));
218        }
219
220        (sql, params)
221    }
222
223    /// Execute a query and return multiple results.
224    #[instrument(skip(self, columns, filters, sort), fields(table = %table))]
225    pub async fn query_many(
226        &self,
227        table: &str,
228        columns: &[String],
229        filters: &HashMap<String, FilterValue>,
230        sort: &[(String, SortOrder)],
231        limit: Option<u64>,
232        offset: Option<u64>,
233    ) -> Result<Vec<SqliteQueryResult>, SqliteError> {
234        let (sql, params) = self.build_select(table, columns, filters, sort, limit, offset);
235        debug!(sql = %sql, "Executing query_many");
236
237        let conn = self.pool.get().await?;
238
239        let results = conn.query_params(&sql, params).await?;
240
241        Ok(results.into_iter().map(SqliteQueryResult::new).collect())
242    }
243
244    /// Execute a query and return a single result.
245    #[instrument(skip(self, columns, filters), fields(table = %table))]
246    pub async fn query_one(
247        &self,
248        table: &str,
249        columns: &[String],
250        filters: &HashMap<String, FilterValue>,
251    ) -> Result<SqliteQueryResult, SqliteError> {
252        let (sql, params) = self.build_select(table, columns, filters, &[], Some(1), None);
253        debug!(sql = %sql, "Executing query_one");
254
255        let conn = self.pool.get().await?;
256
257        let results = conn.query_params(&sql, params).await?;
258
259        results
260            .into_iter()
261            .next()
262            .map(SqliteQueryResult::new)
263            .ok_or_else(|| {
264                SqliteError::query(format!(
265                    "No row found in table '{}' with the given filters",
266                    table
267                ))
268            })
269    }
270
271    /// Execute a query and return an optional result.
272    #[instrument(skip(self, columns, filters), fields(table = %table))]
273    pub async fn query_optional(
274        &self,
275        table: &str,
276        columns: &[String],
277        filters: &HashMap<String, FilterValue>,
278    ) -> Result<Option<SqliteQueryResult>, SqliteError> {
279        let (sql, params) = self.build_select(table, columns, filters, &[], Some(1), None);
280        debug!(sql = %sql, "Executing query_optional");
281
282        let conn = self.pool.get().await?;
283
284        let results = conn.query_params(&sql, params).await?;
285
286        Ok(results.into_iter().next().map(SqliteQueryResult::new))
287    }
288
289    /// Execute an INSERT and return the result.
290    #[instrument(skip(self, data), fields(table = %table))]
291    pub async fn execute_insert(
292        &self,
293        table: &str,
294        data: &HashMap<String, FilterValue>,
295    ) -> Result<SqliteQueryResult, SqliteError> {
296        let (sql, params) = self.build_insert(table, data);
297        debug!(sql = %sql, "Executing insert");
298
299        let conn = self.pool.get().await?;
300
301        let last_rowid = conn.execute_insert_params(&sql, params).await?;
302
303        // Return the inserted row
304        let mut result = data.clone();
305        if !result.contains_key("id") {
306            result.insert("id".to_string(), FilterValue::Int(last_rowid));
307        }
308
309        let json = result
310            .into_iter()
311            .map(|(k, v)| (k, filter_value_to_json(&v)))
312            .collect::<serde_json::Map<_, _>>();
313
314        Ok(SqliteQueryResult::new(JsonValue::Object(json)))
315    }
316
317    /// Execute an UPDATE and return the number of affected rows.
318    #[instrument(skip(self, data, filters), fields(table = %table))]
319    pub async fn execute_update(
320        &self,
321        table: &str,
322        data: &HashMap<String, FilterValue>,
323        filters: &HashMap<String, FilterValue>,
324    ) -> Result<u64, SqliteError> {
325        let (sql, params) = self.build_update(table, data, filters);
326        debug!(sql = %sql, "Executing update");
327
328        let conn = self.pool.get().await?;
329
330        let affected = conn.execute_params(&sql, params).await?;
331
332        Ok(affected as u64)
333    }
334
335    /// Execute a DELETE and return the number of affected rows.
336    #[instrument(skip(self, filters), fields(table = %table))]
337    pub async fn execute_delete(
338        &self,
339        table: &str,
340        filters: &HashMap<String, FilterValue>,
341    ) -> Result<u64, SqliteError> {
342        let (sql, params) = self.build_delete(table, filters);
343        debug!(sql = %sql, "Executing delete");
344
345        let conn = self.pool.get().await?;
346
347        let affected = conn.execute_params(&sql, params).await?;
348
349        Ok(affected as u64)
350    }
351
352    /// Execute raw SQL and return results.
353    #[instrument(skip(self, params), fields(sql = %sql))]
354    pub async fn execute_raw(
355        &self,
356        sql: &str,
357        params: &[FilterValue],
358    ) -> Result<Vec<SqliteQueryResult>, SqliteError> {
359        debug!("Executing raw SQL");
360
361        let sqlite_params: Vec<Value> = params.iter().map(filter_value_to_sqlite).collect();
362
363        let conn = self.pool.get().await?;
364
365        let results = conn.query_params(sql, sqlite_params).await?;
366
367        Ok(results.into_iter().map(SqliteQueryResult::new).collect())
368    }
369
370    // =========================================================================
371    // Raw SQL Functions
372    // =========================================================================
373
374    /// Execute a raw SQL query using the `Sql` builder from prax-query.
375    ///
376    /// # Example
377    ///
378    /// ```rust,ignore
379    /// use prax_query::raw::Sql;
380    ///
381    /// let sql = Sql::new("SELECT * FROM users WHERE age > ")
382    ///     .bind(18)
383    ///     .push(" AND active = ")
384    ///     .bind(true);
385    ///
386    /// let results = engine.raw_sql(sql).await?;
387    /// ```
388    #[instrument(skip(self, sql))]
389    pub async fn raw_sql(
390        &self,
391        sql: prax_query::raw::Sql,
392    ) -> Result<Vec<SqliteQueryResult>, SqliteError> {
393        let (query_string, params) = sql.build();
394        debug!(sql = %query_string, "Executing raw SQL from builder");
395        self.raw_sql_query(&query_string, &params).await
396    }
397
398    /// Execute a raw SQL query string with parameters and return results.
399    ///
400    /// # Example
401    ///
402    /// ```rust,ignore
403    /// let results = engine.raw_sql_query(
404    ///     "SELECT * FROM users WHERE age > ? AND active = ?",
405    ///     &[FilterValue::Int(18), FilterValue::Bool(true)]
406    /// ).await?;
407    /// ```
408    #[instrument(skip(self, params), fields(sql = %sql))]
409    pub async fn raw_sql_query(
410        &self,
411        sql: &str,
412        params: &[FilterValue],
413    ) -> Result<Vec<SqliteQueryResult>, SqliteError> {
414        debug!("Executing raw SQL query");
415
416        let sqlite_params: Vec<Value> = params.iter().map(filter_value_to_sqlite).collect();
417
418        let conn = self.pool.get().await?;
419
420        let results = conn.query_params(sql, sqlite_params).await?;
421
422        Ok(results.into_iter().map(SqliteQueryResult::new).collect())
423    }
424
425    /// Execute a raw SQL statement and return the number of affected rows.
426    ///
427    /// Use this for INSERT, UPDATE, DELETE, or other statements that don't return rows.
428    ///
429    /// # Example
430    ///
431    /// ```rust,ignore
432    /// let affected = engine.raw_sql_execute(
433    ///     "UPDATE users SET last_login = datetime('now') WHERE id = ?",
434    ///     &[FilterValue::Int(user_id)]
435    /// ).await?;
436    /// println!("Updated {} rows", affected);
437    /// ```
438    #[instrument(skip(self, params), fields(sql = %sql))]
439    pub async fn raw_sql_execute(
440        &self,
441        sql: &str,
442        params: &[FilterValue],
443    ) -> Result<u64, SqliteError> {
444        debug!("Executing raw SQL statement");
445
446        let sqlite_params: Vec<Value> = params.iter().map(filter_value_to_sqlite).collect();
447
448        let conn = self.pool.get().await?;
449
450        let affected = conn.execute_params(sql, sqlite_params).await?;
451
452        Ok(affected as u64)
453    }
454
455    /// Execute a raw SQL query and return the first result.
456    ///
457    /// Returns an error if no rows are returned.
458    ///
459    /// # Example
460    ///
461    /// ```rust,ignore
462    /// let user = engine.raw_sql_first(
463    ///     "SELECT * FROM users WHERE id = ?",
464    ///     &[FilterValue::Int(user_id)]
465    /// ).await?;
466    /// ```
467    #[instrument(skip(self, params), fields(sql = %sql))]
468    pub async fn raw_sql_first(
469        &self,
470        sql: &str,
471        params: &[FilterValue],
472    ) -> Result<SqliteQueryResult, SqliteError> {
473        debug!("Executing raw SQL first");
474
475        let sqlite_params: Vec<Value> = params.iter().map(filter_value_to_sqlite).collect();
476
477        let conn = self.pool.get().await?;
478
479        let results = conn.query_params(sql, sqlite_params).await?;
480
481        results
482            .into_iter()
483            .next()
484            .map(SqliteQueryResult::new)
485            .ok_or_else(|| SqliteError::query("raw_sql_first returned no rows"))
486    }
487
488    /// Execute a raw SQL query and return the first result, or None if no rows.
489    ///
490    /// # Example
491    ///
492    /// ```rust,ignore
493    /// let user = engine.raw_sql_optional(
494    ///     "SELECT * FROM users WHERE email = ?",
495    ///     &[FilterValue::String("test@example.com".into())]
496    /// ).await?;
497    /// ```
498    #[instrument(skip(self, params), fields(sql = %sql))]
499    pub async fn raw_sql_optional(
500        &self,
501        sql: &str,
502        params: &[FilterValue],
503    ) -> Result<Option<SqliteQueryResult>, SqliteError> {
504        debug!("Executing raw SQL optional");
505
506        let sqlite_params: Vec<Value> = params.iter().map(filter_value_to_sqlite).collect();
507
508        let conn = self.pool.get().await?;
509
510        let results = conn.query_params(sql, sqlite_params).await?;
511
512        Ok(results.into_iter().next().map(SqliteQueryResult::new))
513    }
514
515    /// Execute a raw SQL query and return a single scalar value.
516    ///
517    /// # Example
518    ///
519    /// ```rust,ignore
520    /// let count: i64 = engine.raw_sql_scalar(
521    ///     "SELECT COUNT(*) FROM users WHERE active = ?",
522    ///     &[FilterValue::Bool(true)]
523    /// ).await?;
524    /// ```
525    #[instrument(skip(self, params), fields(sql = %sql))]
526    pub async fn raw_sql_scalar<T>(
527        &self,
528        sql: &str,
529        params: &[FilterValue],
530    ) -> Result<T, SqliteError>
531    where
532        T: for<'a> serde::Deserialize<'a>,
533    {
534        debug!("Executing raw SQL scalar");
535
536        let sqlite_params: Vec<Value> = params.iter().map(filter_value_to_sqlite).collect();
537
538        let conn = self.pool.get().await?;
539
540        let results = conn.query_params(sql, sqlite_params).await?;
541
542        let row = results
543            .into_iter()
544            .next()
545            .ok_or_else(|| SqliteError::query("raw_sql_scalar returned no rows"))?;
546
547        // Get the first column value
548        let value = row
549            .as_object()
550            .and_then(|obj| obj.values().next())
551            .ok_or_else(|| SqliteError::query("raw_sql_scalar returned empty row"))?;
552
553        serde_json::from_value(value.clone()).map_err(|e| {
554            SqliteError::deserialization(format!("failed to deserialize scalar: {}", e))
555        })
556    }
557
558    /// Execute multiple raw SQL statements in a batch.
559    ///
560    /// This is useful for running schema migrations or multiple DDL statements.
561    ///
562    /// # Example
563    ///
564    /// ```rust,ignore
565    /// engine.raw_sql_batch(r#"
566    ///     CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY);
567    ///     CREATE TABLE IF NOT EXISTS posts (id INTEGER PRIMARY KEY);
568    /// "#).await?;
569    /// ```
570    #[instrument(skip(self), fields(sql_len = %sql.len()))]
571    pub async fn raw_sql_batch(&self, sql: &str) -> Result<(), SqliteError> {
572        debug!("Executing raw SQL batch");
573
574        let conn = self.pool.get().await?;
575
576        conn.execute_batch(sql).await
577    }
578
579    /// Count rows matching the filter.
580    #[instrument(skip(self, filters), fields(table = %table))]
581    pub async fn count(
582        &self,
583        table: &str,
584        filters: &HashMap<String, FilterValue>,
585    ) -> Result<u64, SqliteError> {
586        let mut sql = format!("SELECT COUNT(*) as count FROM \"{}\"", table);
587        let mut params: Vec<Value> = Vec::new();
588
589        if !filters.is_empty() {
590            let mut conditions = Vec::new();
591            for (field, value) in filters {
592                match value {
593                    FilterValue::Null => {
594                        conditions.push(format!("\"{}\" IS NULL", field));
595                    }
596                    _ => {
597                        conditions.push(format!("\"{}\" = ?", field));
598                        params.push(filter_value_to_sqlite(value));
599                    }
600                }
601            }
602            sql.push_str(" WHERE ");
603            sql.push_str(&conditions.join(" AND "));
604        }
605
606        debug!(sql = %sql, "Executing count");
607
608        let conn = self.pool.get().await?;
609
610        let results = conn.query_params(&sql, params).await?;
611
612        // Extract count from first row
613        let count = results
614            .first()
615            .and_then(|row| row.get("count"))
616            .and_then(|v| v.as_i64())
617            .unwrap_or(0);
618
619        Ok(count as u64)
620    }
621}
622
623/// Convert a FilterValue to JSON.
624fn filter_value_to_json(value: &FilterValue) -> JsonValue {
625    match value {
626        FilterValue::Null => JsonValue::Null,
627        FilterValue::Bool(b) => JsonValue::Bool(*b),
628        FilterValue::Int(i) => JsonValue::Number((*i).into()),
629        FilterValue::Float(f) => serde_json::Number::from_f64(*f)
630            .map(JsonValue::Number)
631            .unwrap_or(JsonValue::Null),
632        FilterValue::String(s) => JsonValue::String(s.clone()),
633        FilterValue::Json(j) => j.clone(),
634        FilterValue::List(list) => {
635            JsonValue::Array(list.iter().map(filter_value_to_json).collect())
636        }
637    }
638}
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643
644    #[test]
645    fn test_filter_value_to_json() {
646        assert_eq!(filter_value_to_json(&FilterValue::Null), JsonValue::Null);
647        assert_eq!(
648            filter_value_to_json(&FilterValue::Bool(true)),
649            JsonValue::Bool(true)
650        );
651        assert_eq!(
652            filter_value_to_json(&FilterValue::Int(42)),
653            JsonValue::Number(42.into())
654        );
655        assert_eq!(
656            filter_value_to_json(&FilterValue::String("test".to_string())),
657            JsonValue::String("test".to_string())
658        );
659    }
660
661    #[test]
662    fn test_build_select_simple() {
663        let sql = "SELECT * FROM \"users\"";
664        assert!(sql.contains("SELECT"));
665        assert!(sql.contains("users"));
666    }
667
668    #[test]
669    fn test_query_result() {
670        let result = SqliteQueryResult::new(JsonValue::Object(serde_json::Map::new()));
671        assert!(result.json().is_object());
672    }
673
674    #[test]
675    fn test_query_result_into_json() {
676        let json = JsonValue::Object(serde_json::Map::new());
677        let result = SqliteQueryResult::new(json.clone());
678        assert_eq!(result.into_json(), json);
679    }
680
681    #[test]
682    fn test_sql_builder_integration() {
683        use prax_query::raw::Sql;
684
685        let sql = Sql::new("SELECT * FROM users WHERE age > ")
686            .bind(18)
687            .push(" AND active = ")
688            .bind(true);
689
690        let (query, params) = sql.build();
691        assert!(query.contains("SELECT"));
692        assert!(query.contains("users"));
693        assert_eq!(params.len(), 2);
694    }
695}