bubble_db/
sqlite.rs

1use crate::{DatabaseConfig, DatabaseConnection, DbResult};
2use async_trait::async_trait;
3use rusqlite::{Connection, Row};
4use std::collections::HashMap;
5use tokio::sync::Mutex;
6
7#[derive(Debug)]
8pub struct SqliteConnection {
9    conn: Mutex<Connection>,
10}
11
12impl SqliteConnection {
13    pub async fn connect(config: &DatabaseConfig) -> DbResult<Self> {
14        let conn = Connection::open(&config.database).map_err(|e| e.to_string())?;
15        Ok(Self {
16            conn: Mutex::new(conn),
17        })
18    }
19
20    fn row_to_map(row: &Row) -> DbResult<HashMap<String, String>> {
21        let mut map = HashMap::new();
22        for (i, column) in row.as_ref().column_names().iter().enumerate() {
23            let name = column.to_string();
24            let value: String = row.get(i).unwrap_or_default();
25            map.insert(name, value);
26        }
27        Ok(map)
28    }
29}
30
31#[async_trait]
32impl DatabaseConnection for SqliteConnection {
33    async fn execute(&self, sql: &str) -> DbResult<u64> {
34        let mut conn = self.conn.lock().await;
35        conn.execute(sql, [])
36            .map(|n| n as u64)
37            .map_err(|e| e.to_string())
38    }
39
40    async fn query(&self, sql: &str) -> DbResult<String> {
41        let conn = self.conn.lock().await;
42        let mut stmt = conn.prepare(sql).map_err(|e| e.to_string())?;
43        let rows = stmt.query([]).map_err(|e| e.to_string())?;
44        let mut results = Vec::new();
45        let mut rows_iter = rows;
46        while let Some(row) = rows_iter.next().map_err(|e| e.to_string())? {
47            let map = Self::row_to_map(&row)?;
48            results.push(map);
49        }
50        serde_json::to_string(&results).map_err(|e| e.to_string())
51    }
52
53    async fn query_one(&self, sql: &str) -> DbResult<String> {
54        let conn = self.conn.lock().await;
55        let mut stmt = conn.prepare(sql).map_err(|e| e.to_string())?;
56        let mut rows = stmt.query([]).map_err(|e| e.to_string())?;
57
58        if let Some(row) = rows.next().map_err(|e| e.to_string())? {
59            let map = Self::row_to_map(&row)?;
60            serde_json::to_string(&map).map_err(|e| e.to_string())
61        } else {
62            Err("No rows found".to_string())
63        }
64    }
65
66    async fn insert_batch(&self, table: &str, json_data: &str) -> DbResult<u64> {
67        let items: Vec<serde_json::Value> = serde_json::from_str(json_data)
68            .map_err(|e| format!("Failed to parse JSON data: {}", e))?;
69
70        if items.is_empty() {
71            return Ok(0);
72        }
73        let mut conn = self.conn.lock().await;
74        let tx = conn.transaction().map_err(|e| e.to_string())?;
75        for item in items.iter() {
76            let value = crate::to_sql_value(item)?;
77            let sql = format!("INSERT INTO {} VALUES ({})", table, value);
78            tx.execute(&sql, []).map_err(|e| e.to_string())?;
79        }
80        tx.commit().map_err(|e| e.to_string())?;
81        Ok(items.len() as u64)
82    }
83}