database_replicator/mysql/mod.rs
1// ABOUTME: MySQL database reading utilities for replication to PostgreSQL
2// ABOUTME: Provides secure connection validation and read-only database access
3
4pub mod converter;
5pub mod reader;
6
7use anyhow::{bail, Context, Result};
8use mysql_async::{Conn, Opts};
9
10/// Validate a MySQL connection string to prevent injection attacks
11///
12/// Security checks:
13/// - Validates URL format (mysql:// prefix)
14/// - Ensures non-empty connection string
15/// - Prevents malformed URLs
16///
17/// # Arguments
18///
19/// * `connection_string` - MySQL connection URL
20///
21/// # Returns
22///
23/// Validated connection string if valid, error otherwise
24///
25/// # Security
26///
27/// CRITICAL: This function prevents connection string injection attacks
28///
29/// # Examples
30///
31/// ```
32/// # use database_replicator::mysql::validate_mysql_url;
33/// // Valid URLs
34/// assert!(validate_mysql_url("mysql://localhost:3306/mydb").is_ok());
35/// assert!(validate_mysql_url("mysql://user:pass@host:3306/db").is_ok());
36///
37/// // Invalid URLs
38/// assert!(validate_mysql_url("").is_err());
39/// assert!(validate_mysql_url("postgresql://host/db").is_err());
40/// ```
41pub fn validate_mysql_url(connection_string: &str) -> Result<String> {
42 if connection_string.is_empty() {
43 bail!("MySQL connection string cannot be empty");
44 }
45
46 if !connection_string.starts_with("mysql://") {
47 bail!(
48 "Invalid MySQL connection string '{}'. \
49 Must start with 'mysql://'",
50 connection_string
51 );
52 }
53
54 tracing::debug!("Validated MySQL connection string");
55
56 Ok(connection_string.to_string())
57}
58
59/// Connect to MySQL database
60///
61/// Validates the connection string, creates a connection pool, and verifies
62/// connectivity by executing a simple query.
63///
64/// # Arguments
65///
66/// * `connection_string` - MySQL connection URL (mysql://...)
67///
68/// # Returns
69///
70/// MySQL connection if successful
71///
72/// # Errors
73///
74/// Returns error if:
75/// - Connection string is invalid
76/// - Cannot parse connection options
77/// - Cannot connect to MySQL server
78/// - Cannot verify connection (ping fails)
79///
80/// # Examples
81///
82/// ```no_run
83/// # use database_replicator::mysql::connect_mysql;
84/// # async fn example() -> anyhow::Result<()> {
85/// let conn = connect_mysql("mysql://user:pass@localhost:3306/mydb").await?;
86/// # Ok(())
87/// # }
88/// ```
89pub async fn connect_mysql(connection_string: &str) -> Result<Conn> {
90 // Validate connection string first
91 let validated_url = validate_mysql_url(connection_string)?;
92
93 tracing::info!("Connecting to MySQL database");
94
95 // Parse connection options
96 let opts = Opts::from_url(&validated_url)
97 .with_context(|| "Failed to parse MySQL connection options")?;
98
99 // Create connection
100 let conn = Conn::new(opts)
101 .await
102 .context("Failed to create MySQL connection")?;
103
104 tracing::debug!("Successfully connected to MySQL");
105
106 Ok(conn)
107}
108
109/// Extract database name from MySQL connection string
110///
111/// Parses the connection URL and extracts the database name if present.
112///
113/// # Arguments
114///
115/// * `connection_string` - MySQL connection URL
116///
117/// # Returns
118///
119/// Database name if present in URL, None otherwise
120///
121/// # Examples
122///
123/// ```
124/// # use database_replicator::mysql::extract_database_name;
125/// assert_eq!(
126/// extract_database_name("mysql://localhost:3306/mydb"),
127/// Some("mydb".to_string())
128/// );
129/// assert_eq!(
130/// extract_database_name("mysql://localhost:3306"),
131/// None
132/// );
133/// ```
134pub fn extract_database_name(connection_string: &str) -> Option<String> {
135 let opts = Opts::from_url(connection_string).ok()?;
136 opts.db_name().map(|s| s.to_string())
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142
143 #[test]
144 fn test_validate_empty_url() {
145 let result = validate_mysql_url("");
146 assert!(result.is_err());
147 assert!(result.unwrap_err().to_string().contains("cannot be empty"));
148 }
149
150 #[test]
151 fn test_validate_invalid_prefix() {
152 let invalid_urls = vec![
153 "postgresql://localhost/db",
154 "mongodb://localhost/db",
155 "http://localhost",
156 "localhost:3306",
157 ];
158
159 for url in invalid_urls {
160 let result = validate_mysql_url(url);
161 assert!(result.is_err(), "Invalid URL should be rejected: {}", url);
162 }
163 }
164
165 #[test]
166 fn test_validate_valid_mysql_url() {
167 // Note: This test validates URL format, not actual connection
168 let valid_urls = vec![
169 "mysql://localhost:3306",
170 "mysql://localhost:3306/mydb",
171 "mysql://user:pass@localhost:3306/mydb",
172 "mysql://user@localhost/db",
173 ];
174
175 for url in valid_urls {
176 let result = validate_mysql_url(url);
177 assert!(result.is_ok(), "Valid URL should be accepted: {}", url);
178 }
179 }
180
181 #[test]
182 fn test_extract_database_name_with_db() {
183 let url = "mysql://localhost:3306/mydb";
184 let db_name = extract_database_name(url);
185 assert_eq!(db_name, Some("mydb".to_string()));
186 }
187
188 #[test]
189 fn test_extract_database_name_without_db() {
190 let url = "mysql://localhost:3306";
191 let db_name = extract_database_name(url);
192 assert_eq!(db_name, None);
193 }
194
195 #[test]
196 fn test_extract_database_name_with_auth() {
197 let url = "mysql://user:pass@localhost:3306/mydb";
198 let db_name = extract_database_name(url);
199 assert_eq!(db_name, Some("mydb".to_string()));
200 }
201}