database_replicator/mysql/mod.rs
1// ABOUTME: MySQL database reading utilities for replication to PostgreSQL
2// ABOUTME: Provides secure connection validation and read-only database access
3
4pub mod converter;
5pub mod reader;
6
7use anyhow::{bail, Context, Result};
8use mysql_async::{Conn, Opts};
9
10/// Validate a MySQL connection string to prevent injection attacks
11///
12/// Security checks:
13/// - Validates URL format (mysql:// prefix)
14/// - Ensures non-empty connection string
15/// - Prevents malformed URLs
16///
17/// # Arguments
18///
19/// * `connection_string` - MySQL connection URL
20///
21/// # Returns
22///
23/// Validated connection string if valid, error otherwise
24///
25/// # Security
26///
27/// CRITICAL: This function prevents connection string injection attacks
28///
29/// # Examples
30///
31/// ```
32/// # use database_replicator::mysql::validate_mysql_url;
33/// // Valid URLs
34/// assert!(validate_mysql_url("mysql://localhost:3306/mydb").is_ok());
35/// assert!(validate_mysql_url("mysql://user:pass@host:3306/db").is_ok());
36///
37/// // Invalid URLs
38/// assert!(validate_mysql_url("").is_err());
39/// assert!(validate_mysql_url("postgresql://host/db").is_err());
40/// ```
41pub fn validate_mysql_url(connection_string: &str) -> Result<String> {
42 if connection_string.is_empty() {
43 bail!("MySQL connection string cannot be empty");
44 }
45
46 if !connection_string.starts_with("mysql://") {
47 bail!("Invalid MySQL connection string. Must start with 'mysql://'");
48 }
49
50 tracing::debug!("Validated MySQL connection string");
51
52 Ok(connection_string.to_string())
53}
54
55/// Connect to MySQL database
56///
57/// Validates the connection string, creates a connection pool, and verifies
58/// connectivity by executing a simple query.
59///
60/// # Arguments
61///
62/// * `connection_string` - MySQL connection URL (mysql://...)
63///
64/// # Returns
65///
66/// MySQL connection if successful
67///
68/// # Errors
69///
70/// Returns error if:
71/// - Connection string is invalid
72/// - Cannot parse connection options
73/// - Cannot connect to MySQL server
74/// - Cannot verify connection (ping fails)
75///
76/// # Examples
77///
78/// ```no_run
79/// # use database_replicator::mysql::connect_mysql;
80/// # async fn example() -> anyhow::Result<()> {
81/// let conn = connect_mysql("mysql://user:pass@localhost:3306/mydb").await?;
82/// # Ok(())
83/// # }
84/// ```
85pub async fn connect_mysql(connection_string: &str) -> Result<Conn> {
86 // Validate connection string first
87 let validated_url = validate_mysql_url(connection_string)?;
88
89 tracing::info!("Connecting to MySQL database");
90
91 // Parse connection options
92 let opts = Opts::from_url(&validated_url)
93 .with_context(|| "Failed to parse MySQL connection options")?;
94
95 // Create connection
96 let conn = Conn::new(opts)
97 .await
98 .context("Failed to create MySQL connection")?;
99
100 tracing::debug!("Successfully connected to MySQL");
101
102 Ok(conn)
103}
104
105/// Extract database name from MySQL connection string
106///
107/// Parses the connection URL and extracts the database name if present.
108///
109/// # Arguments
110///
111/// * `connection_string` - MySQL connection URL
112///
113/// # Returns
114///
115/// Database name if present in URL, None otherwise
116///
117/// # Examples
118///
119/// ```
120/// # use database_replicator::mysql::extract_database_name;
121/// assert_eq!(
122/// extract_database_name("mysql://localhost:3306/mydb"),
123/// Some("mydb".to_string())
124/// );
125/// assert_eq!(
126/// extract_database_name("mysql://localhost:3306"),
127/// None
128/// );
129/// ```
130pub fn extract_database_name(connection_string: &str) -> Option<String> {
131 let opts = Opts::from_url(connection_string).ok()?;
132 opts.db_name().map(|s| s.to_string())
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 #[test]
140 fn test_validate_empty_url() {
141 let result = validate_mysql_url("");
142 assert!(result.is_err());
143 assert!(result.unwrap_err().to_string().contains("cannot be empty"));
144 }
145
146 #[test]
147 fn test_validate_invalid_prefix() {
148 let invalid_urls = vec![
149 "postgresql://localhost/db",
150 "mongodb://localhost/db",
151 "http://localhost",
152 "localhost:3306",
153 ];
154
155 for url in invalid_urls {
156 let result = validate_mysql_url(url);
157 assert!(result.is_err(), "Invalid URL should be rejected: {}", url);
158 }
159 }
160
161 #[test]
162 fn test_validate_valid_mysql_url() {
163 // Note: This test validates URL format, not actual connection
164 let valid_urls = vec![
165 "mysql://localhost:3306",
166 "mysql://localhost:3306/mydb",
167 "mysql://user:pass@localhost:3306/mydb",
168 "mysql://user@localhost/db",
169 ];
170
171 for url in valid_urls {
172 let result = validate_mysql_url(url);
173 assert!(result.is_ok(), "Valid URL should be accepted: {}", url);
174 }
175 }
176
177 #[test]
178 fn test_extract_database_name_with_db() {
179 let url = "mysql://localhost:3306/mydb";
180 let db_name = extract_database_name(url);
181 assert_eq!(db_name, Some("mydb".to_string()));
182 }
183
184 #[test]
185 fn test_extract_database_name_without_db() {
186 let url = "mysql://localhost:3306";
187 let db_name = extract_database_name(url);
188 assert_eq!(db_name, None);
189 }
190
191 #[test]
192 fn test_extract_database_name_with_auth() {
193 let url = "mysql://user:pass@localhost:3306/mydb";
194 let db_name = extract_database_name(url);
195 assert_eq!(db_name, Some("mydb".to_string()));
196 }
197}