database_replicator/mysql/
reader.rs

1// ABOUTME: MySQL database introspection and data reading
2// ABOUTME: Provides read-only access to tables and rows with security validation
3
4use anyhow::{Context, Result};
5use mysql_async::{prelude::*, Conn, Row};
6
7/// List all user tables in a MySQL database
8///
9/// Queries INFORMATION_SCHEMA to discover all user tables, excluding system tables.
10/// Returns tables in alphabetical order.
11///
12/// # Arguments
13///
14/// * `conn` - MySQL connection
15/// * `db_name` - Database name to list tables from
16///
17/// # Returns
18///
19/// Vector of table names
20///
21/// # Examples
22///
23/// ```no_run
24/// # use database_replicator::mysql::{connect_mysql, reader::list_tables};
25/// # async fn example() -> anyhow::Result<()> {
26/// let mut conn = connect_mysql("mysql://localhost:3306/mydb").await?;
27/// let tables = list_tables(&mut conn, "mydb").await?;
28/// println!("Found {} tables", tables.len());
29/// # Ok(())
30/// # }
31/// ```
32pub async fn list_tables(conn: &mut Conn, db_name: &str) -> Result<Vec<String>> {
33    tracing::info!("Listing tables from MySQL database '{}'", db_name);
34
35    let query = r#"
36        SELECT TABLE_NAME
37        FROM INFORMATION_SCHEMA.TABLES
38        WHERE TABLE_SCHEMA = ?
39        AND TABLE_TYPE = 'BASE TABLE'
40        ORDER BY TABLE_NAME
41    "#;
42
43    let tables: Vec<String> = conn
44        .exec(query, (db_name,))
45        .await
46        .with_context(|| format!("Failed to list tables from database '{}'", db_name))?;
47
48    tracing::info!("Found {} table(s) in database '{}'", tables.len(), db_name);
49
50    Ok(tables)
51}
52
53/// Get row count for a MySQL table
54///
55/// Executes COUNT(*) query to determine table size.
56///
57/// # Arguments
58///
59/// * `conn` - MySQL connection
60/// * `db_name` - Database name
61/// * `table_name` - Table name
62///
63/// # Returns
64///
65/// Number of rows in the table
66///
67/// # Examples
68///
69/// ```no_run
70/// # use database_replicator::mysql::{connect_mysql, reader::get_table_row_count};
71/// # async fn example() -> anyhow::Result<()> {
72/// let mut conn = connect_mysql("mysql://localhost:3306/mydb").await?;
73/// let count = get_table_row_count(&mut conn, "mydb", "users").await?;
74/// println!("Table has {} rows", count);
75/// # Ok(())
76/// # }
77/// ```
78pub async fn get_table_row_count(
79    conn: &mut Conn,
80    db_name: &str,
81    table_name: &str,
82) -> Result<usize> {
83    // Validate table name to prevent injection
84    crate::jsonb::validate_table_name(table_name).context("Invalid table name for count query")?;
85
86    tracing::debug!("Getting row count for table '{}.{}'", db_name, table_name);
87
88    // Use backticks for identifiers to allow reserved words
89    let query = format!("SELECT COUNT(*) FROM `{}`.`{}`", db_name, table_name);
90
91    let count: Option<u64> = conn
92        .query_first(&query)
93        .await
94        .with_context(|| format!("Failed to count rows in table '{}'", table_name))?;
95
96    let count = count.unwrap_or(0) as usize;
97
98    tracing::debug!("Table '{}' has {} rows", table_name, count);
99
100    Ok(count)
101}
102
103/// Read all data from a MySQL table
104///
105/// Reads all rows from the table and returns them as MySQL Row objects.
106/// For large tables, this may consume significant memory.
107///
108/// # Arguments
109///
110/// * `conn` - MySQL connection
111/// * `db_name` - Database name
112/// * `table_name` - Table name to read from
113///
114/// # Returns
115///
116/// Vector of MySQL Row objects
117///
118/// # Security
119///
120/// Table name is validated to prevent SQL injection
121///
122/// # Examples
123///
124/// ```no_run
125/// # use database_replicator::mysql::{connect_mysql, reader::read_table_data};
126/// # async fn example() -> anyhow::Result<()> {
127/// let mut conn = connect_mysql("mysql://localhost:3306/mydb").await?;
128/// let rows = read_table_data(&mut conn, "mydb", "users").await?;
129/// println!("Read {} rows", rows.len());
130/// # Ok(())
131/// # }
132/// ```
133pub async fn read_table_data(conn: &mut Conn, db_name: &str, table_name: &str) -> Result<Vec<Row>> {
134    // Validate table name to prevent injection
135    crate::jsonb::validate_table_name(table_name).context("Invalid table name for data reading")?;
136
137    tracing::info!("Reading all rows from table '{}.{}'", db_name, table_name);
138
139    // Use backticks for identifiers
140    let query = format!("SELECT * FROM `{}`.`{}`", db_name, table_name);
141
142    let rows: Vec<Row> = conn
143        .query(&query)
144        .await
145        .with_context(|| format!("Failed to read data from table '{}'", table_name))?;
146
147    tracing::info!("Read {} rows from table '{}'", rows.len(), table_name);
148
149    Ok(rows)
150}
151
152#[cfg(test)]
153mod tests {
154    #[test]
155    fn test_validate_table_names() {
156        // Valid table names should pass validation
157        let valid_names = vec!["users", "user_events", "UserData", "_private"];
158
159        for name in valid_names {
160            let result = crate::jsonb::validate_table_name(name);
161            assert!(
162                result.is_ok(),
163                "Valid table name '{}' should be accepted",
164                name
165            );
166        }
167    }
168
169    #[test]
170    fn test_reject_malicious_table_names() {
171        // Malicious table names should be rejected
172        let malicious_names = vec![
173            "users; DROP TABLE users;",
174            "users' OR '1'='1",
175            "../etc/passwd",
176            "users--",
177        ];
178
179        for name in malicious_names {
180            let result = crate::jsonb::validate_table_name(name);
181            assert!(
182                result.is_err(),
183                "Malicious table name '{}' should be rejected",
184                name
185            );
186        }
187    }
188}