Skip to main content

dataprof_db/connectors/
sqlite.rs

1//! SQLite database connector (embedded database)
2
3#[cfg(not(feature = "sqlite"))]
4use super::common::feature_not_enabled_error;
5#[cfg(feature = "sqlite")]
6use super::common::{build_count_query, not_connected_error};
7use crate::connection::ConnectionInfo;
8#[cfg(feature = "sqlite")]
9use crate::security::validate_sql_identifier;
10use crate::{DataProfilerError, DatabaseConfig, DatabaseConnector};
11#[cfg(feature = "sqlite")]
12use crate::{process_rows_to_columns, streaming_profile_loop};
13use async_trait::async_trait;
14use std::collections::HashMap;
15
16#[cfg(feature = "sqlite")]
17use {sqlx::sqlite::SqlitePool, sqlx::sqlite::SqlitePoolOptions};
18
19/// SQLite embedded database connector
20pub struct SqliteConnector {
21    #[allow(dead_code)]
22    config: DatabaseConfig,
23    #[allow(dead_code)]
24    connection_info: ConnectionInfo,
25    #[cfg(feature = "sqlite")]
26    pool: Option<SqlitePool>,
27    #[cfg(not(feature = "sqlite"))]
28    #[allow(dead_code)]
29    pool: Option<()>,
30}
31
32impl SqliteConnector {
33    /// Create a new SQLite connector
34    pub fn new(config: DatabaseConfig) -> Result<Self, DataProfilerError> {
35        let connection_info = ConnectionInfo::parse(&config.connection_string)?;
36
37        if connection_info.database_type() != "sqlite" {
38            return Err(DataProfilerError::DatabaseConfigError {
39                message: format!(
40                    "Invalid connection string for SQLite: {}",
41                    config.connection_string
42                ),
43            });
44        }
45
46        Ok(Self {
47            config,
48            connection_info,
49            pool: None,
50        })
51    }
52
53    /// Get the database file path
54    #[allow(dead_code)]
55    fn get_db_path(&self) -> Result<String, DataProfilerError> {
56        if let Some(path) = &self.connection_info.path {
57            Ok(path.clone())
58        } else if let Some(db) = &self.connection_info.database {
59            Ok(db.clone())
60        } else {
61            Err(DataProfilerError::DatabaseConfigError {
62                message: "No database path specified for SQLite".to_string(),
63            })
64        }
65    }
66}
67
68#[async_trait]
69impl DatabaseConnector for SqliteConnector {
70    async fn connect(&mut self) -> Result<(), DataProfilerError> {
71        #[cfg(feature = "sqlite")]
72        {
73            let db_path = self.get_db_path()?;
74            let connection_string = if db_path == ":memory:" {
75                "sqlite::memory:".to_string()
76            } else {
77                format!("sqlite://{}", db_path)
78            };
79
80            let pool = SqlitePoolOptions::new()
81                .max_connections(1)
82                .acquire_timeout(
83                    self.config
84                        .connection_timeout
85                        .unwrap_or(std::time::Duration::from_secs(30)),
86                )
87                .connect(&connection_string)
88                .await
89                .map_err(|e| {
90                    DataProfilerError::database_connection(&format!(
91                        "Failed to connect to SQLite: {}",
92                        e
93                    ))
94                })?;
95
96            self.pool = Some(pool);
97            Ok(())
98        }
99
100        #[cfg(not(feature = "sqlite"))]
101        {
102            Err(DataProfilerError::database_feature_disabled(
103                "SQLite", "sqlite",
104            ))
105        }
106    }
107
108    async fn disconnect(&mut self) -> Result<(), DataProfilerError> {
109        #[cfg(feature = "sqlite")]
110        {
111            if let Some(pool) = &self.pool {
112                pool.close().await;
113                self.pool = None;
114            }
115        }
116        Ok(())
117    }
118
119    #[allow(unused_variables)]
120    async fn profile_query(
121        &mut self,
122        query: &str,
123    ) -> Result<HashMap<String, Vec<String>>, DataProfilerError> {
124        #[cfg(feature = "sqlite")]
125        {
126            let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
127
128            let rows = sqlx::query(query).fetch_all(pool).await.map_err(|e| {
129                DataProfilerError::database_query(&format!("Query execution failed: {}", e))
130            })?;
131
132            Ok(process_rows_to_columns!(rows))
133        }
134
135        #[cfg(not(feature = "sqlite"))]
136        Err(feature_not_enabled_error("SQLite", "sqlite"))
137    }
138
139    #[allow(unused_variables)]
140    async fn profile_query_streaming(
141        &mut self,
142        query: &str,
143        batch_size: usize,
144    ) -> Result<HashMap<String, Vec<String>>, DataProfilerError> {
145        #[cfg(feature = "sqlite")]
146        {
147            let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
148
149            let count_query = build_count_query(query)?;
150            let total_rows: i64 = sqlx::query_scalar(&count_query)
151                .fetch_one(pool)
152                .await
153                .map_err(|e| {
154                    DataProfilerError::database_query(&format!("Failed to count rows: {}", e))
155                })?;
156
157            streaming_profile_loop!(pool, query, batch_size, total_rows, "SQLite")
158        }
159
160        #[cfg(not(feature = "sqlite"))]
161        Err(feature_not_enabled_error("SQLite", "sqlite"))
162    }
163
164    #[allow(unused_variables)]
165    async fn get_table_schema(
166        &mut self,
167        table_name: &str,
168    ) -> Result<Vec<String>, DataProfilerError> {
169        #[cfg(feature = "sqlite")]
170        {
171            use sqlx::Row;
172
173            let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
174
175            let query = format!("PRAGMA table_info({})", table_name);
176
177            let rows = sqlx::query(&query).fetch_all(pool).await.map_err(|e| {
178                DataProfilerError::database_query(&format!("Failed to get table schema: {}", e))
179            })?;
180
181            let mut columns = Vec::new();
182            for row in rows {
183                let column_name: String = row.try_get(1).map_err(|e| {
184                    DataProfilerError::database_query(&format!("Failed to read column name: {}", e))
185                })?;
186                columns.push(column_name);
187            }
188
189            Ok(columns)
190        }
191
192        #[cfg(not(feature = "sqlite"))]
193        Err(feature_not_enabled_error("SQLite", "sqlite"))
194    }
195
196    #[allow(unused_variables)]
197    async fn count_table_rows(&mut self, table_name: &str) -> Result<u64, DataProfilerError> {
198        #[cfg(feature = "sqlite")]
199        {
200            let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
201
202            validate_sql_identifier(table_name)?;
203            let query = format!("SELECT COUNT(*) FROM {}", table_name);
204            let count: i64 = sqlx::query_scalar(&query)
205                .fetch_one(pool)
206                .await
207                .map_err(|e| {
208                    DataProfilerError::database_query(&format!("Failed to count rows: {}", e))
209                })?;
210
211            Ok(count as u64)
212        }
213
214        #[cfg(not(feature = "sqlite"))]
215        Err(feature_not_enabled_error("SQLite", "sqlite"))
216    }
217
218    async fn test_connection(&mut self) -> Result<bool, DataProfilerError> {
219        #[cfg(feature = "sqlite")]
220        {
221            let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
222
223            let result: i32 = sqlx::query_scalar("SELECT 1")
224                .fetch_one(pool)
225                .await
226                .map_err(|e| {
227                    DataProfilerError::database_query(&format!("Connection test failed: {}", e))
228                })?;
229
230            Ok(result == 1)
231        }
232
233        #[cfg(not(feature = "sqlite"))]
234        Err(feature_not_enabled_error("SQLite", "sqlite"))
235    }
236}