Skip to main content

dataprof_db/connectors/
mysql.rs

1//! MySQL/MariaDB database connector
2
3#[cfg(not(feature = "mysql"))]
4use super::common::feature_not_enabled_error;
5#[cfg(feature = "mysql")]
6use super::common::{build_count_query, not_connected_error};
7use crate::connection::ConnectionInfo;
8#[cfg(feature = "mysql")]
9use crate::security::validate_sql_identifier;
10use crate::{DataProfilerError, DatabaseConfig, DatabaseConnector};
11#[cfg(feature = "mysql")]
12use crate::{process_rows_to_columns, streaming_profile_loop};
13use async_trait::async_trait;
14use std::collections::HashMap;
15
16#[cfg(feature = "mysql")]
17use {sqlx::mysql::MySqlPool, sqlx::mysql::MySqlPoolOptions};
18
19/// MySQL/MariaDB connector
20pub struct MySqlConnector {
21    #[allow(dead_code)]
22    config: DatabaseConfig,
23    #[allow(dead_code)]
24    connection_info: ConnectionInfo,
25    #[cfg(feature = "mysql")]
26    pool: Option<MySqlPool>,
27    #[cfg(not(feature = "mysql"))]
28    #[allow(dead_code)]
29    pool: Option<()>,
30}
31
32impl MySqlConnector {
33    /// Create a new MySQL connector
34    pub fn new(config: DatabaseConfig) -> Result<Self, DataProfilerError> {
35        let connection_info = ConnectionInfo::parse(&config.connection_string)?;
36
37        if connection_info.database_type() != "mysql" {
38            return Err(DataProfilerError::DatabaseConfigError {
39                message: format!(
40                    "Invalid connection string for MySQL: {}",
41                    config.connection_string
42                ),
43            });
44        }
45
46        Ok(Self {
47            config,
48            connection_info,
49            pool: None,
50        })
51    }
52}
53
54#[async_trait]
55impl DatabaseConnector for MySqlConnector {
56    async fn connect(&mut self) -> Result<(), DataProfilerError> {
57        #[cfg(feature = "mysql")]
58        {
59            let connection_string = self.connection_info.to_connection_string("sqlx");
60
61            let pool = MySqlPoolOptions::new()
62                .max_connections(self.config.max_connections.unwrap_or(10))
63                .acquire_timeout(
64                    self.config
65                        .connection_timeout
66                        .unwrap_or(std::time::Duration::from_secs(30)),
67                )
68                .connect(&connection_string)
69                .await
70                .map_err(|e| {
71                    DataProfilerError::database_connection(&format!(
72                        "Failed to connect to MySQL: {}",
73                        e
74                    ))
75                })?;
76
77            self.pool = Some(pool);
78            Ok(())
79        }
80
81        #[cfg(not(feature = "mysql"))]
82        {
83            Err(DataProfilerError::database_feature_disabled(
84                "MySQL", "mysql",
85            ))
86        }
87    }
88
89    async fn disconnect(&mut self) -> Result<(), DataProfilerError> {
90        #[cfg(feature = "mysql")]
91        {
92            if let Some(pool) = &self.pool {
93                pool.close().await;
94                self.pool = None;
95            }
96        }
97        Ok(())
98    }
99
100    #[allow(unused_variables)]
101    async fn profile_query(
102        &mut self,
103        query: &str,
104    ) -> Result<HashMap<String, Vec<String>>, DataProfilerError> {
105        #[cfg(feature = "mysql")]
106        {
107            let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
108
109            let rows = sqlx::query(query).fetch_all(pool).await.map_err(|e| {
110                DataProfilerError::database_query(&format!("Query execution failed: {}", e))
111            })?;
112
113            Ok(process_rows_to_columns!(rows))
114        }
115
116        #[cfg(not(feature = "mysql"))]
117        Err(feature_not_enabled_error("MySQL", "mysql"))
118    }
119
120    #[allow(unused_variables)]
121    async fn profile_query_streaming(
122        &mut self,
123        query: &str,
124        batch_size: usize,
125    ) -> Result<HashMap<String, Vec<String>>, DataProfilerError> {
126        #[cfg(feature = "mysql")]
127        {
128            let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
129
130            let count_query = build_count_query(query)?;
131            let total_rows: i64 = sqlx::query_scalar(&count_query)
132                .fetch_one(pool)
133                .await
134                .map_err(|e| {
135                    DataProfilerError::database_query(&format!("Failed to count rows: {}", e))
136                })?;
137
138            streaming_profile_loop!(pool, query, batch_size, total_rows, "MySQL")
139        }
140
141        #[cfg(not(feature = "mysql"))]
142        Err(feature_not_enabled_error("MySQL", "mysql"))
143    }
144
145    #[allow(unused_variables)]
146    async fn get_table_schema(
147        &mut self,
148        table_name: &str,
149    ) -> Result<Vec<String>, DataProfilerError> {
150        #[cfg(feature = "mysql")]
151        {
152            use sqlx::Row;
153
154            let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
155
156            let query = r#"
157                SELECT COLUMN_NAME
158                FROM INFORMATION_SCHEMA.COLUMNS
159                WHERE TABLE_NAME = ?
160                ORDER BY ORDINAL_POSITION
161            "#;
162
163            let rows = sqlx::query(query)
164                .bind(table_name)
165                .fetch_all(pool)
166                .await
167                .map_err(|e| {
168                    DataProfilerError::database_query(&format!("Failed to get table schema: {}", e))
169                })?;
170
171            let mut columns = Vec::new();
172            for row in rows {
173                let column_name: String = row.try_get(0).map_err(|e| {
174                    DataProfilerError::database_query(&format!("Failed to read column name: {}", e))
175                })?;
176                columns.push(column_name);
177            }
178
179            Ok(columns)
180        }
181
182        #[cfg(not(feature = "mysql"))]
183        Err(feature_not_enabled_error("MySQL", "mysql"))
184    }
185
186    #[allow(unused_variables)]
187    async fn count_table_rows(&mut self, table_name: &str) -> Result<u64, DataProfilerError> {
188        #[cfg(feature = "mysql")]
189        {
190            let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
191
192            validate_sql_identifier(table_name)?;
193            let query = format!("SELECT COUNT(*) FROM {}", table_name);
194            let count: i64 = sqlx::query_scalar(&query)
195                .fetch_one(pool)
196                .await
197                .map_err(|e| {
198                    DataProfilerError::database_query(&format!("Failed to count rows: {}", e))
199                })?;
200
201            Ok(count as u64)
202        }
203
204        #[cfg(not(feature = "mysql"))]
205        Err(feature_not_enabled_error("MySQL", "mysql"))
206    }
207
208    async fn test_connection(&mut self) -> Result<bool, DataProfilerError> {
209        #[cfg(feature = "mysql")]
210        {
211            let pool = self.pool.as_ref().ok_or_else(not_connected_error)?;
212
213            let result: i32 = sqlx::query_scalar("SELECT 1")
214                .fetch_one(pool)
215                .await
216                .map_err(|e| {
217                    DataProfilerError::database_query(&format!("Connection test failed: {}", e))
218                })?;
219
220            Ok(result == 1)
221        }
222
223        #[cfg(not(feature = "mysql"))]
224        Err(feature_not_enabled_error("MySQL", "mysql"))
225    }
226}