database_replicator/mysql/
reader.rs

1// ABOUTME: MySQL database introspection and data reading
2// ABOUTME: Provides read-only access to tables and rows with security validation
3
4use anyhow::{Context, Result};
5use mysql_async::{prelude::*, Conn, Row};
6
7/// List all user tables in a MySQL database
8///
9/// Queries INFORMATION_SCHEMA to discover all user tables, excluding system tables.
10/// Returns tables in alphabetical order.
11///
12/// # Arguments
13///
14/// * `conn` - MySQL connection
15/// * `db_name` - Database name to list tables from
16///
17/// # Returns
18///
19/// Vector of table names
20///
21/// # Examples
22///
23/// ```no_run
24/// # use database_replicator::mysql::{connect_mysql, reader::list_tables};
25/// # async fn example() -> anyhow::Result<()> {
26/// let mut conn = connect_mysql("mysql://localhost:3306/mydb").await?;
27/// let tables = list_tables(&mut conn, "mydb").await?;
28/// println!("Found {} tables", tables.len());
29/// # Ok(())
30/// # }
31/// ```
32pub async fn list_tables(conn: &mut Conn, db_name: &str) -> Result<Vec<String>> {
33    tracing::info!("Listing tables from MySQL database '{}'", db_name);
34
35    let query = r#"
36        SELECT TABLE_NAME
37        FROM INFORMATION_SCHEMA.TABLES
38        WHERE TABLE_SCHEMA = ?
39        AND TABLE_TYPE = 'BASE TABLE'
40        ORDER BY TABLE_NAME
41    "#;
42
43    let tables: Vec<String> = conn
44        .exec(query, (db_name,))
45        .await
46        .with_context(|| format!("Failed to list tables from database '{}'", db_name))?;
47
48    tracing::info!("Found {} table(s) in database '{}'", tables.len(), db_name);
49
50    Ok(tables)
51}
52
53/// Get row count for a MySQL table
54///
55/// Executes COUNT(*) query to determine table size.
56///
57/// # Arguments
58///
59/// * `conn` - MySQL connection
60/// * `db_name` - Database name
61/// * `table_name` - Table name
62///
63/// # Returns
64///
65/// Number of rows in the table
66///
67/// # Examples
68///
69/// ```no_run
70/// # use database_replicator::mysql::{connect_mysql, reader::get_table_row_count};
71/// # async fn example() -> anyhow::Result<()> {
72/// let mut conn = connect_mysql("mysql://localhost:3306/mydb").await?;
73/// let count = get_table_row_count(&mut conn, "mydb", "users").await?;
74/// println!("Table has {} rows", count);
75/// # Ok(())
76/// # }
77/// ```
78pub async fn get_table_row_count(
79    conn: &mut Conn,
80    db_name: &str,
81    table_name: &str,
82) -> Result<usize> {
83    // Validate table name to prevent injection
84    crate::jsonb::validate_table_name(table_name).context("Invalid table name for count query")?;
85
86    tracing::debug!("Getting row count for table '{}.{}'", db_name, table_name);
87
88    // Use backticks for identifiers to allow reserved words
89    let query = format!(
90        "SELECT COUNT(*) FROM {}.{}",
91        crate::utils::quote_mysql_ident(db_name),
92        crate::utils::quote_mysql_ident(table_name)
93    );
94
95    let count: Option<u64> = conn
96        .query_first(&query)
97        .await
98        .with_context(|| format!("Failed to count rows in table '{}'", table_name))?;
99
100    let count = count.unwrap_or(0) as usize;
101
102    tracing::debug!("Table '{}' has {} rows", table_name, count);
103
104    Ok(count)
105}
106
107/// Read all data from a MySQL table
108///
109/// Reads all rows from the table and returns them as MySQL Row objects.
110/// For large tables, this may consume significant memory.
111///
112/// # Arguments
113///
114/// * `conn` - MySQL connection
115/// * `db_name` - Database name
116/// * `table_name` - Table name to read from
117///
118/// # Returns
119///
120/// Vector of MySQL Row objects
121///
122/// # Security
123///
124/// Table name is validated to prevent SQL injection
125///
126/// # Examples
127///
128/// ```no_run
129/// # use database_replicator::mysql::{connect_mysql, reader::read_table_data};
130/// # async fn example() -> anyhow::Result<()> {
131/// let mut conn = connect_mysql("mysql://localhost:3306/mydb").await?;
132/// let rows = read_table_data(&mut conn, "mydb", "users").await?;
133/// println!("Read {} rows", rows.len());
134/// # Ok(())
135/// # }
136/// ```
137pub async fn read_table_data(conn: &mut Conn, db_name: &str, table_name: &str) -> Result<Vec<Row>> {
138    // Validate table name to prevent injection
139    crate::jsonb::validate_table_name(table_name).context("Invalid table name for data reading")?;
140
141    tracing::info!("Reading all rows from table '{}.{}'", db_name, table_name);
142
143    // Use backticks for identifiers
144    let query = format!(
145        "SELECT * FROM {}.{}",
146        crate::utils::quote_mysql_ident(db_name),
147        crate::utils::quote_mysql_ident(table_name)
148    );
149
150    let rows: Vec<Row> = conn
151        .query(&query)
152        .await
153        .with_context(|| format!("Failed to read data from table '{}'", table_name))?;
154
155    tracing::info!("Read {} rows from table '{}'", rows.len(), table_name);
156
157    Ok(rows)
158}
159
160#[cfg(test)]
161mod tests {
162    #[test]
163    fn test_validate_table_names() {
164        // Valid table names should pass validation
165        let valid_names = vec!["users", "user_events", "UserData", "_private"];
166
167        for name in valid_names {
168            let result = crate::jsonb::validate_table_name(name);
169            assert!(
170                result.is_ok(),
171                "Valid table name '{}' should be accepted",
172                name
173            );
174        }
175    }
176
177    #[test]
178    fn test_reject_malicious_table_names() {
179        // Malicious table names should be rejected
180        let malicious_names = vec![
181            "users; DROP TABLE users;",
182            "users' OR '1'='1",
183            "../etc/passwd",
184            "users--",
185        ];
186
187        for name in malicious_names {
188            let result = crate::jsonb::validate_table_name(name);
189            assert!(
190                result.is_err(),
191                "Malicious table name '{}' should be rejected",
192                name
193            );
194        }
195    }
196}