database_replicator/mysql/reader.rs
1// ABOUTME: MySQL database introspection and data reading
2// ABOUTME: Provides read-only access to tables and rows with security validation
3
4use anyhow::{Context, Result};
5use mysql_async::{prelude::*, Conn, Row};
6
7/// List all user tables in a MySQL database
8///
9/// Queries INFORMATION_SCHEMA to discover all user tables, excluding system tables.
10/// Returns tables in alphabetical order.
11///
12/// # Arguments
13///
14/// * `conn` - MySQL connection
15/// * `db_name` - Database name to list tables from
16///
17/// # Returns
18///
19/// Vector of table names
20///
21/// # Examples
22///
23/// ```no_run
24/// # use database_replicator::mysql::{connect_mysql, reader::list_tables};
25/// # async fn example() -> anyhow::Result<()> {
26/// let mut conn = connect_mysql("mysql://localhost:3306/mydb").await?;
27/// let tables = list_tables(&mut conn, "mydb").await?;
28/// println!("Found {} tables", tables.len());
29/// # Ok(())
30/// # }
31/// ```
32pub async fn list_tables(conn: &mut Conn, db_name: &str) -> Result<Vec<String>> {
33 tracing::info!("Listing tables from MySQL database '{}'", db_name);
34
35 let query = r#"
36 SELECT TABLE_NAME
37 FROM INFORMATION_SCHEMA.TABLES
38 WHERE TABLE_SCHEMA = ?
39 AND TABLE_TYPE = 'BASE TABLE'
40 ORDER BY TABLE_NAME
41 "#;
42
43 let tables: Vec<String> = conn
44 .exec(query, (db_name,))
45 .await
46 .with_context(|| format!("Failed to list tables from database '{}'", db_name))?;
47
48 tracing::info!("Found {} table(s) in database '{}'", tables.len(), db_name);
49
50 Ok(tables)
51}
52
53/// Get row count for a MySQL table
54///
55/// Executes COUNT(*) query to determine table size.
56///
57/// # Arguments
58///
59/// * `conn` - MySQL connection
60/// * `db_name` - Database name
61/// * `table_name` - Table name
62///
63/// # Returns
64///
65/// Number of rows in the table
66///
67/// # Examples
68///
69/// ```no_run
70/// # use database_replicator::mysql::{connect_mysql, reader::get_table_row_count};
71/// # async fn example() -> anyhow::Result<()> {
72/// let mut conn = connect_mysql("mysql://localhost:3306/mydb").await?;
73/// let count = get_table_row_count(&mut conn, "mydb", "users").await?;
74/// println!("Table has {} rows", count);
75/// # Ok(())
76/// # }
77/// ```
78pub async fn get_table_row_count(
79 conn: &mut Conn,
80 db_name: &str,
81 table_name: &str,
82) -> Result<usize> {
83 // Validate table name to prevent injection
84 crate::jsonb::validate_table_name(table_name).context("Invalid table name for count query")?;
85
86 tracing::debug!("Getting row count for table '{}.{}'", db_name, table_name);
87
88 // Use backticks for identifiers to allow reserved words
89 let query = format!(
90 "SELECT COUNT(*) FROM {}.{}",
91 crate::utils::quote_mysql_ident(db_name),
92 crate::utils::quote_mysql_ident(table_name)
93 );
94
95 let count: Option<u64> = conn
96 .query_first(&query)
97 .await
98 .with_context(|| format!("Failed to count rows in table '{}'", table_name))?;
99
100 let count = count.unwrap_or(0) as usize;
101
102 tracing::debug!("Table '{}' has {} rows", table_name, count);
103
104 Ok(count)
105}
106
107/// Read all data from a MySQL table
108///
109/// Reads all rows from the table and returns them as MySQL Row objects.
110/// For large tables, this may consume significant memory.
111///
112/// # Arguments
113///
114/// * `conn` - MySQL connection
115/// * `db_name` - Database name
116/// * `table_name` - Table name to read from
117///
118/// # Returns
119///
120/// Vector of MySQL Row objects
121///
122/// # Security
123///
124/// Table name is validated to prevent SQL injection
125///
126/// # Examples
127///
128/// ```no_run
129/// # use database_replicator::mysql::{connect_mysql, reader::read_table_data};
130/// # async fn example() -> anyhow::Result<()> {
131/// let mut conn = connect_mysql("mysql://localhost:3306/mydb").await?;
132/// let rows = read_table_data(&mut conn, "mydb", "users").await?;
133/// println!("Read {} rows", rows.len());
134/// # Ok(())
135/// # }
136/// ```
137pub async fn read_table_data(conn: &mut Conn, db_name: &str, table_name: &str) -> Result<Vec<Row>> {
138 // Validate table name to prevent injection
139 crate::jsonb::validate_table_name(table_name).context("Invalid table name for data reading")?;
140
141 tracing::info!("Reading all rows from table '{}.{}'", db_name, table_name);
142
143 // Use backticks for identifiers
144 let query = format!(
145 "SELECT * FROM {}.{}",
146 crate::utils::quote_mysql_ident(db_name),
147 crate::utils::quote_mysql_ident(table_name)
148 );
149
150 let rows: Vec<Row> = conn
151 .query(&query)
152 .await
153 .with_context(|| format!("Failed to read data from table '{}'", table_name))?;
154
155 tracing::info!("Read {} rows from table '{}'", rows.len(), table_name);
156
157 Ok(rows)
158}
159
160#[cfg(test)]
161mod tests {
162 #[test]
163 fn test_validate_table_names() {
164 // Valid table names should pass validation
165 let valid_names = vec!["users", "user_events", "UserData", "_private"];
166
167 for name in valid_names {
168 let result = crate::jsonb::validate_table_name(name);
169 assert!(
170 result.is_ok(),
171 "Valid table name '{}' should be accepted",
172 name
173 );
174 }
175 }
176
177 #[test]
178 fn test_reject_malicious_table_names() {
179 // Malicious table names should be rejected
180 let malicious_names = vec![
181 "users; DROP TABLE users;",
182 "users' OR '1'='1",
183 "../etc/passwd",
184 "users--",
185 ];
186
187 for name in malicious_names {
188 let result = crate::jsonb::validate_table_name(name);
189 assert!(
190 result.is_err(),
191 "Malicious table name '{}' should be rejected",
192 name
193 );
194 }
195 }
196}