dataprof_db/connectors/
sqlite.rs1#[cfg(not(feature = "sqlite"))]
4use super::common::feature_not_enabled_error;
5#[cfg(feature = "sqlite")]
6use super::common::{build_count_query, not_connected_error};
7use crate::connection::ConnectionInfo;
8#[cfg(feature = "sqlite")]
9use crate::security::validate_sql_identifier;
10use crate::{DataProfilerError, DatabaseConfig, DatabaseConnector};
11#[cfg(feature = "sqlite")]
12use crate::{process_rows_to_columns, streaming_profile_loop};
13use async_trait::async_trait;
14use std::collections::HashMap;
15
16#[cfg(feature = "sqlite")]
17use {sqlx::sqlite::SqlitePool, sqlx::sqlite::SqlitePoolOptions};
18
19pub struct SqliteConnector {
21 #[allow(dead_code)]
22 config: DatabaseConfig,
23 #[allow(dead_code)]
24 connection_info: ConnectionInfo,
25 #[cfg(feature = "sqlite")]
26 pool: Option<SqlitePool>,
27 #[cfg(not(feature = "sqlite"))]
28 #[allow(dead_code)]
29 pool: Option<()>,
30}
31
32impl SqliteConnector {
33 pub fn new(config: DatabaseConfig) -> Result<Self, DataProfilerError> {
35 let connection_info = ConnectionInfo::parse(&config.connection_string)?;
36
37 if connection_info.database_type() != "sqlite" {
38 return Err(DataProfilerError::DatabaseConfigError {
39 message: format!(
40 "Invalid connection string for SQLite: {}",
41 config.connection_string
42 ),
43 });
44 }
45
46 Ok(Self {
47 config,
48 connection_info,
49 pool: None,
50 })
51 }
52
53 #[allow(dead_code)]
55 fn get_db_path(&self) -> Result<String, DataProfilerError> {
56 if let Some(path) = &self.connection_info.path {
57 Ok(path.clone())
58 } else if let Some(db) = &self.connection_info.database {
59 Ok(db.clone())
60 } else {
61 Err(DataProfilerError::DatabaseConfigError {
62 message: "No database path specified for SQLite".to_string(),
63 })
64 }
65 }
66}
67
68#[async_trait]
69impl DatabaseConnector for SqliteConnector {
70 async fn connect(&mut self) -> Result<(), DataProfilerError> {
71 #[cfg(feature = "sqlite")]
72 {
73 let db_path = self.get_db_path()?;
74 let connection_string = if db_path == ":memory:" {
75 "sqlite::memory:".to_string()
76 } else {
77 format!("sqlite://{}", db_path)
78 };
79
80 let pool = SqlitePoolOptions::new()
81 .max_connections(1)
82 .acquire_timeout(
83 self.config
84 .connection_timeout
85 .unwrap_or(std::time::Duration::from_secs(30)),
86 )
87 .connect(&connection_string)
88 .await
89 .map_err(|e| {
90 DataProfilerError::database_connection(&format!(
91 "Failed to connect to SQLite: {}",
92 e
93 ))
94 })?;
95
96 self.pool = Some(pool);
97 Ok(())
98 }
99
100 #[cfg(not(feature = "sqlite"))]
101 {
102 Err(DataProfilerError::database_feature_disabled(
103 "SQLite", "sqlite",
104 ))
105 }
106 }
107
108 async fn disconnect(&mut self) -> Result<(), DataProfilerError> {
109 #[cfg(feature = "sqlite")]
110 {
111 if let Some(pool) = &self.pool {
112 pool.close().await;
113 self.pool = None;
114 }
115 }
116 Ok(())
117 }
118
119 #[allow(unused_variables)]
120 async fn profile_query(
121 &mut self,
122 query: &str,
123 ) -> Result<HashMap<String, Vec<String>>, DataProfilerError> {
124 #[cfg(feature = "sqlite")]
125 {
126 let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
127
128 let rows = sqlx::query(query).fetch_all(pool).await.map_err(|e| {
129 DataProfilerError::database_query(&format!("Query execution failed: {}", e))
130 })?;
131
132 Ok(process_rows_to_columns!(rows))
133 }
134
135 #[cfg(not(feature = "sqlite"))]
136 Err(feature_not_enabled_error("SQLite", "sqlite"))
137 }
138
139 #[allow(unused_variables)]
140 async fn profile_query_streaming(
141 &mut self,
142 query: &str,
143 batch_size: usize,
144 ) -> Result<HashMap<String, Vec<String>>, DataProfilerError> {
145 #[cfg(feature = "sqlite")]
146 {
147 let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
148
149 let count_query = build_count_query(query)?;
150 let total_rows: i64 = sqlx::query_scalar(&count_query)
151 .fetch_one(pool)
152 .await
153 .map_err(|e| {
154 DataProfilerError::database_query(&format!("Failed to count rows: {}", e))
155 })?;
156
157 streaming_profile_loop!(pool, query, batch_size, total_rows, "SQLite")
158 }
159
160 #[cfg(not(feature = "sqlite"))]
161 Err(feature_not_enabled_error("SQLite", "sqlite"))
162 }
163
164 #[allow(unused_variables)]
165 async fn get_table_schema(
166 &mut self,
167 table_name: &str,
168 ) -> Result<Vec<String>, DataProfilerError> {
169 #[cfg(feature = "sqlite")]
170 {
171 use sqlx::Row;
172
173 let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
174
175 let query = format!("PRAGMA table_info({})", table_name);
176
177 let rows = sqlx::query(&query).fetch_all(pool).await.map_err(|e| {
178 DataProfilerError::database_query(&format!("Failed to get table schema: {}", e))
179 })?;
180
181 let mut columns = Vec::new();
182 for row in rows {
183 let column_name: String = row.try_get(1).map_err(|e| {
184 DataProfilerError::database_query(&format!("Failed to read column name: {}", e))
185 })?;
186 columns.push(column_name);
187 }
188
189 Ok(columns)
190 }
191
192 #[cfg(not(feature = "sqlite"))]
193 Err(feature_not_enabled_error("SQLite", "sqlite"))
194 }
195
196 #[allow(unused_variables)]
197 async fn count_table_rows(&mut self, table_name: &str) -> Result<u64, DataProfilerError> {
198 #[cfg(feature = "sqlite")]
199 {
200 let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
201
202 validate_sql_identifier(table_name)?;
203 let query = format!("SELECT COUNT(*) FROM {}", table_name);
204 let count: i64 = sqlx::query_scalar(&query)
205 .fetch_one(pool)
206 .await
207 .map_err(|e| {
208 DataProfilerError::database_query(&format!("Failed to count rows: {}", e))
209 })?;
210
211 Ok(count as u64)
212 }
213
214 #[cfg(not(feature = "sqlite"))]
215 Err(feature_not_enabled_error("SQLite", "sqlite"))
216 }
217
218 async fn test_connection(&mut self) -> Result<bool, DataProfilerError> {
219 #[cfg(feature = "sqlite")]
220 {
221 let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
222
223 let result: i32 = sqlx::query_scalar("SELECT 1")
224 .fetch_one(pool)
225 .await
226 .map_err(|e| {
227 DataProfilerError::database_query(&format!("Connection test failed: {}", e))
228 })?;
229
230 Ok(result == 1)
231 }
232
233 #[cfg(not(feature = "sqlite"))]
234 Err(feature_not_enabled_error("SQLite", "sqlite"))
235 }
236}