dataprof_db/connectors/
mysql.rs1#[cfg(not(feature = "mysql"))]
4use super::common::feature_not_enabled_error;
5#[cfg(feature = "mysql")]
6use super::common::{build_count_query, not_connected_error};
7use crate::connection::ConnectionInfo;
8#[cfg(feature = "mysql")]
9use crate::security::validate_sql_identifier;
10use crate::{DataProfilerError, DatabaseConfig, DatabaseConnector};
11#[cfg(feature = "mysql")]
12use crate::{process_rows_to_columns, streaming_profile_loop};
13use async_trait::async_trait;
14use std::collections::HashMap;
15
16#[cfg(feature = "mysql")]
17use {sqlx::mysql::MySqlPool, sqlx::mysql::MySqlPoolOptions};
18
19pub struct MySqlConnector {
21 #[allow(dead_code)]
22 config: DatabaseConfig,
23 #[allow(dead_code)]
24 connection_info: ConnectionInfo,
25 #[cfg(feature = "mysql")]
26 pool: Option<MySqlPool>,
27 #[cfg(not(feature = "mysql"))]
28 #[allow(dead_code)]
29 pool: Option<()>,
30}
31
32impl MySqlConnector {
33 pub fn new(config: DatabaseConfig) -> Result<Self, DataProfilerError> {
35 let connection_info = ConnectionInfo::parse(&config.connection_string)?;
36
37 if connection_info.database_type() != "mysql" {
38 return Err(DataProfilerError::DatabaseConfigError {
39 message: format!(
40 "Invalid connection string for MySQL: {}",
41 config.connection_string
42 ),
43 });
44 }
45
46 Ok(Self {
47 config,
48 connection_info,
49 pool: None,
50 })
51 }
52}
53
54#[async_trait]
55impl DatabaseConnector for MySqlConnector {
56 async fn connect(&mut self) -> Result<(), DataProfilerError> {
57 #[cfg(feature = "mysql")]
58 {
59 let connection_string = self.connection_info.to_connection_string("sqlx");
60
61 let pool = MySqlPoolOptions::new()
62 .max_connections(self.config.max_connections.unwrap_or(10))
63 .acquire_timeout(
64 self.config
65 .connection_timeout
66 .unwrap_or(std::time::Duration::from_secs(30)),
67 )
68 .connect(&connection_string)
69 .await
70 .map_err(|e| {
71 DataProfilerError::database_connection(&format!(
72 "Failed to connect to MySQL: {}",
73 e
74 ))
75 })?;
76
77 self.pool = Some(pool);
78 Ok(())
79 }
80
81 #[cfg(not(feature = "mysql"))]
82 {
83 Err(DataProfilerError::database_feature_disabled(
84 "MySQL", "mysql",
85 ))
86 }
87 }
88
89 async fn disconnect(&mut self) -> Result<(), DataProfilerError> {
90 #[cfg(feature = "mysql")]
91 {
92 if let Some(pool) = &self.pool {
93 pool.close().await;
94 self.pool = None;
95 }
96 }
97 Ok(())
98 }
99
100 #[allow(unused_variables)]
101 async fn profile_query(
102 &mut self,
103 query: &str,
104 ) -> Result<HashMap<String, Vec<String>>, DataProfilerError> {
105 #[cfg(feature = "mysql")]
106 {
107 let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
108
109 let rows = sqlx::query(query).fetch_all(pool).await.map_err(|e| {
110 DataProfilerError::database_query(&format!("Query execution failed: {}", e))
111 })?;
112
113 Ok(process_rows_to_columns!(rows))
114 }
115
116 #[cfg(not(feature = "mysql"))]
117 Err(feature_not_enabled_error("MySQL", "mysql"))
118 }
119
120 #[allow(unused_variables)]
121 async fn profile_query_streaming(
122 &mut self,
123 query: &str,
124 batch_size: usize,
125 ) -> Result<HashMap<String, Vec<String>>, DataProfilerError> {
126 #[cfg(feature = "mysql")]
127 {
128 let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
129
130 let count_query = build_count_query(query)?;
131 let total_rows: i64 = sqlx::query_scalar(&count_query)
132 .fetch_one(pool)
133 .await
134 .map_err(|e| {
135 DataProfilerError::database_query(&format!("Failed to count rows: {}", e))
136 })?;
137
138 streaming_profile_loop!(pool, query, batch_size, total_rows, "MySQL")
139 }
140
141 #[cfg(not(feature = "mysql"))]
142 Err(feature_not_enabled_error("MySQL", "mysql"))
143 }
144
145 #[allow(unused_variables)]
146 async fn get_table_schema(
147 &mut self,
148 table_name: &str,
149 ) -> Result<Vec<String>, DataProfilerError> {
150 #[cfg(feature = "mysql")]
151 {
152 use sqlx::Row;
153
154 let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
155
156 let query = r#"
157 SELECT COLUMN_NAME
158 FROM INFORMATION_SCHEMA.COLUMNS
159 WHERE TABLE_NAME = ?
160 ORDER BY ORDINAL_POSITION
161 "#;
162
163 let rows = sqlx::query(query)
164 .bind(table_name)
165 .fetch_all(pool)
166 .await
167 .map_err(|e| {
168 DataProfilerError::database_query(&format!("Failed to get table schema: {}", e))
169 })?;
170
171 let mut columns = Vec::new();
172 for row in rows {
173 let column_name: String = row.try_get(0).map_err(|e| {
174 DataProfilerError::database_query(&format!("Failed to read column name: {}", e))
175 })?;
176 columns.push(column_name);
177 }
178
179 Ok(columns)
180 }
181
182 #[cfg(not(feature = "mysql"))]
183 Err(feature_not_enabled_error("MySQL", "mysql"))
184 }
185
186 #[allow(unused_variables)]
187 async fn count_table_rows(&mut self, table_name: &str) -> Result<u64, DataProfilerError> {
188 #[cfg(feature = "mysql")]
189 {
190 let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
191
192 validate_sql_identifier(table_name)?;
193 let query = format!("SELECT COUNT(*) FROM {}", table_name);
194 let count: i64 = sqlx::query_scalar(&query)
195 .fetch_one(pool)
196 .await
197 .map_err(|e| {
198 DataProfilerError::database_query(&format!("Failed to count rows: {}", e))
199 })?;
200
201 Ok(count as u64)
202 }
203
204 #[cfg(not(feature = "mysql"))]
205 Err(feature_not_enabled_error("MySQL", "mysql"))
206 }
207
208 async fn test_connection(&mut self) -> Result<bool, DataProfilerError> {
209 #[cfg(feature = "mysql")]
210 {
211 let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
212
213 let result: i32 = sqlx::query_scalar("SELECT 1")
214 .fetch_one(pool)
215 .await
216 .map_err(|e| {
217 DataProfilerError::database_query(&format!("Connection test failed: {}", e))
218 })?;
219
220 Ok(result == 1)
221 }
222
223 #[cfg(not(feature = "mysql"))]
224 Err(feature_not_enabled_error("MySQL", "mysql"))
225 }
226}