database_replicator/mysql/
mod.rs

1// ABOUTME: MySQL database reading utilities for replication to PostgreSQL
2// ABOUTME: Provides secure connection validation and read-only database access
3
4pub mod converter;
5pub mod reader;
6
7use anyhow::{bail, Context, Result};
8use mysql_async::{Conn, Opts};
9
10/// Validate a MySQL connection string to prevent injection attacks
11///
12/// Security checks:
13/// - Validates URL format (mysql:// prefix)
14/// - Ensures non-empty connection string
15/// - Prevents malformed URLs
16///
17/// # Arguments
18///
19/// * `connection_string` - MySQL connection URL
20///
21/// # Returns
22///
23/// Validated connection string if valid, error otherwise
24///
25/// # Security
26///
27/// CRITICAL: This function prevents connection string injection attacks
28///
29/// # Examples
30///
31/// ```
32/// # use database_replicator::mysql::validate_mysql_url;
33/// // Valid URLs
34/// assert!(validate_mysql_url("mysql://localhost:3306/mydb").is_ok());
35/// assert!(validate_mysql_url("mysql://user:pass@host:3306/db").is_ok());
36///
37/// // Invalid URLs
38/// assert!(validate_mysql_url("").is_err());
39/// assert!(validate_mysql_url("postgresql://host/db").is_err());
40/// ```
41pub fn validate_mysql_url(connection_string: &str) -> Result<String> {
42    if connection_string.is_empty() {
43        bail!("MySQL connection string cannot be empty");
44    }
45
46    if !connection_string.starts_with("mysql://") {
47        bail!(
48            "Invalid MySQL connection string '{}'. \
49             Must start with 'mysql://'",
50            connection_string
51        );
52    }
53
54    tracing::debug!("Validated MySQL connection string");
55
56    Ok(connection_string.to_string())
57}
58
59/// Connect to MySQL database
60///
61/// Validates the connection string, creates a connection pool, and verifies
62/// connectivity by executing a simple query.
63///
64/// # Arguments
65///
66/// * `connection_string` - MySQL connection URL (mysql://...)
67///
68/// # Returns
69///
70/// MySQL connection if successful
71///
72/// # Errors
73///
74/// Returns error if:
75/// - Connection string is invalid
76/// - Cannot parse connection options
77/// - Cannot connect to MySQL server
78/// - Cannot verify connection (ping fails)
79///
80/// # Examples
81///
82/// ```no_run
83/// # use database_replicator::mysql::connect_mysql;
84/// # async fn example() -> anyhow::Result<()> {
85/// let conn = connect_mysql("mysql://user:pass@localhost:3306/mydb").await?;
86/// # Ok(())
87/// # }
88/// ```
89pub async fn connect_mysql(connection_string: &str) -> Result<Conn> {
90    // Validate connection string first
91    let validated_url = validate_mysql_url(connection_string)?;
92
93    tracing::info!("Connecting to MySQL database");
94
95    // Parse connection options
96    let opts = Opts::from_url(&validated_url)
97        .with_context(|| "Failed to parse MySQL connection options")?;
98
99    // Create connection
100    let conn = Conn::new(opts)
101        .await
102        .context("Failed to create MySQL connection")?;
103
104    tracing::debug!("Successfully connected to MySQL");
105
106    Ok(conn)
107}
108
109/// Extract database name from MySQL connection string
110///
111/// Parses the connection URL and extracts the database name if present.
112///
113/// # Arguments
114///
115/// * `connection_string` - MySQL connection URL
116///
117/// # Returns
118///
119/// Database name if present in URL, None otherwise
120///
121/// # Examples
122///
123/// ```
124/// # use database_replicator::mysql::extract_database_name;
125/// assert_eq!(
126///     extract_database_name("mysql://localhost:3306/mydb"),
127///     Some("mydb".to_string())
128/// );
129/// assert_eq!(
130///     extract_database_name("mysql://localhost:3306"),
131///     None
132/// );
133/// ```
134pub fn extract_database_name(connection_string: &str) -> Option<String> {
135    let opts = Opts::from_url(connection_string).ok()?;
136    opts.db_name().map(|s| s.to_string())
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142
143    #[test]
144    fn test_validate_empty_url() {
145        let result = validate_mysql_url("");
146        assert!(result.is_err());
147        assert!(result.unwrap_err().to_string().contains("cannot be empty"));
148    }
149
150    #[test]
151    fn test_validate_invalid_prefix() {
152        let invalid_urls = vec![
153            "postgresql://localhost/db",
154            "mongodb://localhost/db",
155            "http://localhost",
156            "localhost:3306",
157        ];
158
159        for url in invalid_urls {
160            let result = validate_mysql_url(url);
161            assert!(result.is_err(), "Invalid URL should be rejected: {}", url);
162        }
163    }
164
165    #[test]
166    fn test_validate_valid_mysql_url() {
167        // Note: This test validates URL format, not actual connection
168        let valid_urls = vec![
169            "mysql://localhost:3306",
170            "mysql://localhost:3306/mydb",
171            "mysql://user:pass@localhost:3306/mydb",
172            "mysql://user@localhost/db",
173        ];
174
175        for url in valid_urls {
176            let result = validate_mysql_url(url);
177            assert!(result.is_ok(), "Valid URL should be accepted: {}", url);
178        }
179    }
180
181    #[test]
182    fn test_extract_database_name_with_db() {
183        let url = "mysql://localhost:3306/mydb";
184        let db_name = extract_database_name(url);
185        assert_eq!(db_name, Some("mydb".to_string()));
186    }
187
188    #[test]
189    fn test_extract_database_name_without_db() {
190        let url = "mysql://localhost:3306";
191        let db_name = extract_database_name(url);
192        assert_eq!(db_name, None);
193    }
194
195    #[test]
196    fn test_extract_database_name_with_auth() {
197        let url = "mysql://user:pass@localhost:3306/mydb";
198        let db_name = extract_database_name(url);
199        assert_eq!(db_name, Some("mydb".to_string()));
200    }
201}