database_replicator/mysql/
mod.rs

1// ABOUTME: MySQL database reading utilities for replication to PostgreSQL
2// ABOUTME: Provides secure connection validation and read-only database access
3
4pub mod converter;
5pub mod reader;
6
7use anyhow::{bail, Context, Result};
8use mysql_async::{Conn, Opts};
9
10/// Validate a MySQL connection string to prevent injection attacks
11///
12/// Security checks:
13/// - Validates URL format (mysql:// prefix)
14/// - Ensures non-empty connection string
15/// - Prevents malformed URLs
16///
17/// # Arguments
18///
19/// * `connection_string` - MySQL connection URL
20///
21/// # Returns
22///
23/// Validated connection string if valid, error otherwise
24///
25/// # Security
26///
27/// CRITICAL: This function prevents connection string injection attacks
28///
29/// # Examples
30///
31/// ```
32/// # use database_replicator::mysql::validate_mysql_url;
33/// // Valid URLs
34/// assert!(validate_mysql_url("mysql://localhost:3306/mydb").is_ok());
35/// assert!(validate_mysql_url("mysql://user:pass@host:3306/db").is_ok());
36///
37/// // Invalid URLs
38/// assert!(validate_mysql_url("").is_err());
39/// assert!(validate_mysql_url("postgresql://host/db").is_err());
40/// ```
41pub fn validate_mysql_url(connection_string: &str) -> Result<String> {
42    if connection_string.is_empty() {
43        bail!("MySQL connection string cannot be empty");
44    }
45
46    if !connection_string.starts_with("mysql://") {
47        bail!("Invalid MySQL connection string. Must start with 'mysql://'");
48    }
49
50    tracing::debug!("Validated MySQL connection string");
51
52    Ok(connection_string.to_string())
53}
54
55/// Connect to MySQL database
56///
57/// Validates the connection string, creates a connection pool, and verifies
58/// connectivity by executing a simple query.
59///
60/// # Arguments
61///
62/// * `connection_string` - MySQL connection URL (mysql://...)
63///
64/// # Returns
65///
66/// MySQL connection if successful
67///
68/// # Errors
69///
70/// Returns error if:
71/// - Connection string is invalid
72/// - Cannot parse connection options
73/// - Cannot connect to MySQL server
74/// - Cannot verify connection (ping fails)
75///
76/// # Examples
77///
78/// ```no_run
79/// # use database_replicator::mysql::connect_mysql;
80/// # async fn example() -> anyhow::Result<()> {
81/// let conn = connect_mysql("mysql://user:pass@localhost:3306/mydb").await?;
82/// # Ok(())
83/// # }
84/// ```
85pub async fn connect_mysql(connection_string: &str) -> Result<Conn> {
86    // Validate connection string first
87    let validated_url = validate_mysql_url(connection_string)?;
88
89    tracing::info!("Connecting to MySQL database");
90
91    // Parse connection options
92    let opts = Opts::from_url(&validated_url)
93        .with_context(|| "Failed to parse MySQL connection options")?;
94
95    // Create connection
96    let conn = Conn::new(opts)
97        .await
98        .context("Failed to create MySQL connection")?;
99
100    tracing::debug!("Successfully connected to MySQL");
101
102    Ok(conn)
103}
104
105/// Extract database name from MySQL connection string
106///
107/// Parses the connection URL and extracts the database name if present.
108///
109/// # Arguments
110///
111/// * `connection_string` - MySQL connection URL
112///
113/// # Returns
114///
115/// Database name if present in URL, None otherwise
116///
117/// # Examples
118///
119/// ```
120/// # use database_replicator::mysql::extract_database_name;
121/// assert_eq!(
122///     extract_database_name("mysql://localhost:3306/mydb"),
123///     Some("mydb".to_string())
124/// );
125/// assert_eq!(
126///     extract_database_name("mysql://localhost:3306"),
127///     None
128/// );
129/// ```
130pub fn extract_database_name(connection_string: &str) -> Option<String> {
131    let opts = Opts::from_url(connection_string).ok()?;
132    opts.db_name().map(|s| s.to_string())
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    #[test]
140    fn test_validate_empty_url() {
141        let result = validate_mysql_url("");
142        assert!(result.is_err());
143        assert!(result.unwrap_err().to_string().contains("cannot be empty"));
144    }
145
146    #[test]
147    fn test_validate_invalid_prefix() {
148        let invalid_urls = vec![
149            "postgresql://localhost/db",
150            "mongodb://localhost/db",
151            "http://localhost",
152            "localhost:3306",
153        ];
154
155        for url in invalid_urls {
156            let result = validate_mysql_url(url);
157            assert!(result.is_err(), "Invalid URL should be rejected: {}", url);
158        }
159    }
160
161    #[test]
162    fn test_validate_valid_mysql_url() {
163        // Note: This test validates URL format, not actual connection
164        let valid_urls = vec![
165            "mysql://localhost:3306",
166            "mysql://localhost:3306/mydb",
167            "mysql://user:pass@localhost:3306/mydb",
168            "mysql://user@localhost/db",
169        ];
170
171        for url in valid_urls {
172            let result = validate_mysql_url(url);
173            assert!(result.is_ok(), "Valid URL should be accepted: {}", url);
174        }
175    }
176
177    #[test]
178    fn test_extract_database_name_with_db() {
179        let url = "mysql://localhost:3306/mydb";
180        let db_name = extract_database_name(url);
181        assert_eq!(db_name, Some("mydb".to_string()));
182    }
183
184    #[test]
185    fn test_extract_database_name_without_db() {
186        let url = "mysql://localhost:3306";
187        let db_name = extract_database_name(url);
188        assert_eq!(db_name, None);
189    }
190
191    #[test]
192    fn test_extract_database_name_with_auth() {
193        let url = "mysql://user:pass@localhost:3306/mydb";
194        let db_name = extract_database_name(url);
195        assert_eq!(db_name, Some("mydb".to_string()));
196    }
197}