database_replicator/mysql/
converter.rs

1// ABOUTME: MySQL to JSONB type conversion with lossless data preservation
2// ABOUTME: Handles all MySQL data types including dates, decimals, and binary data
3
4use anyhow::{Context, Result};
5use mysql_async::{prelude::*, Row, Value};
6use serde_json::Value as JsonValue;
7
8/// Convert a MySQL Value to JSON Value
9///
10/// Handles all MySQL data types with lossless conversion:
11/// - Integers → JSON numbers
12/// - Floats/Doubles → JSON numbers (non-finite as strings)
13/// - Decimals → Strings (to preserve precision)
14/// - Strings → JSON strings
15/// - Dates/Times → ISO 8601 strings in special object
16/// - Binary data → Base64 encoded in special object
17/// - NULL → JSON null
18///
19/// # Arguments
20///
21/// * `value` - MySQL Value to convert
22///
23/// # Returns
24///
25/// JSON Value representing the MySQL value
26///
27/// # Examples
28///
29/// ```
30/// # use mysql_async::Value;
31/// # use database_replicator::mysql::converter::mysql_value_to_json;
32/// let mysql_val = Value::Int(42);
33/// let json_val = mysql_value_to_json(&mysql_val).unwrap();
34/// assert_eq!(json_val, serde_json::json!(42));
35/// ```
36pub fn mysql_value_to_json(value: &Value) -> Result<JsonValue> {
37    match value {
38        Value::NULL => Ok(JsonValue::Null),
39
40        Value::Int(i) => Ok(JsonValue::Number((*i).into())),
41        Value::UInt(u) => Ok(JsonValue::Number((*u).into())),
42
43        Value::Float(f) => {
44            if f.is_finite() {
45                serde_json::Number::from_f64(*f as f64)
46                    .map(JsonValue::Number)
47                    .ok_or_else(|| anyhow::anyhow!("Failed to convert float {} to JSON number", f))
48            } else {
49                // Store non-finite as strings
50                Ok(JsonValue::String(f.to_string()))
51            }
52        }
53
54        Value::Double(d) => {
55            if d.is_finite() {
56                serde_json::Number::from_f64(*d)
57                    .map(JsonValue::Number)
58                    .ok_or_else(|| anyhow::anyhow!("Failed to convert double {} to JSON number", d))
59            } else {
60                // Store non-finite as strings
61                Ok(JsonValue::String(d.to_string()))
62            }
63        }
64
65        Value::Bytes(b) => {
66            // Try to interpret as UTF-8 string first
67            if let Ok(s) = String::from_utf8(b.clone()) {
68                Ok(JsonValue::String(s))
69            } else {
70                // If not valid UTF-8, encode as base64
71                let encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, b);
72                Ok(serde_json::json!({
73                    "_type": "binary",
74                    "data": encoded
75                }))
76            }
77        }
78
79        Value::Date(year, month, day, hour, minute, second, micro) => {
80            // Format as ISO 8601 datetime string
81            let datetime_str = format!(
82                "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:06}Z",
83                year, month, day, hour, minute, second, micro
84            );
85            Ok(serde_json::json!({
86                "_type": "datetime",
87                "value": datetime_str
88            }))
89        }
90
91        Value::Time(is_negative, days, hours, minutes, seconds, microseconds) => {
92            // Format as time duration
93            let sign = if *is_negative { "-" } else { "" };
94            let time_str = format!(
95                "{}{}d {:02}:{:02}:{:02}.{:06}",
96                sign, days, hours, minutes, seconds, microseconds
97            );
98            Ok(serde_json::json!({
99                "_type": "time",
100                "value": time_str
101            }))
102        }
103    }
104}
105
106/// Convert a MySQL Row to a JSONB-compatible JSON object
107///
108/// Converts all columns in the row to a JSON object with column names as keys.
109///
110/// # Arguments
111///
112/// * `row` - MySQL Row to convert
113/// * `column_names` - Names of columns in the row (from table schema)
114///
115/// # Returns
116///
117/// JSON object with all column values
118///
119/// # Examples
120///
121/// ```no_run
122/// # use mysql_async::Row;
123/// # use database_replicator::mysql::converter::mysql_row_to_json;
124/// # async fn example(row: Row) -> anyhow::Result<()> {
125/// let column_names = vec!["id".to_string(), "name".to_string()];
126/// let json_obj = mysql_row_to_json(&row, &column_names)?;
127/// # Ok(())
128/// # }
129/// ```
130pub fn mysql_row_to_json(row: &Row, column_names: &[String]) -> Result<JsonValue> {
131    let mut obj = serde_json::Map::new();
132
133    for (idx, col_name) in column_names.iter().enumerate() {
134        // Get value at index
135        let value: Value = row
136            .get(idx)
137            .ok_or_else(|| anyhow::anyhow!("Failed to get column {} at index {}", col_name, idx))?;
138
139        // Convert to JSON
140        let json_val = mysql_value_to_json(&value)
141            .with_context(|| format!("Failed to convert column '{}' to JSON", col_name))?;
142
143        obj.insert(col_name.clone(), json_val);
144    }
145
146    Ok(JsonValue::Object(obj))
147}
148
149/// Get column names for a MySQL table
150///
151/// Queries INFORMATION_SCHEMA to get all column names for a table.
152///
153/// # Arguments
154///
155/// * `conn` - MySQL connection
156/// * `db_name` - Database name
157/// * `table_name` - Table name
158///
159/// # Returns
160///
161/// Vector of column names in order
162///
163/// # Examples
164///
165/// ```no_run
166/// # use database_replicator::mysql::{connect_mysql, converter::get_column_names};
167/// # async fn example() -> anyhow::Result<()> {
168/// let mut conn = connect_mysql("mysql://localhost:3306/mydb").await?;
169/// let columns = get_column_names(&mut conn, "mydb", "users").await?;
170/// # Ok(())
171/// # }
172/// ```
173pub async fn get_column_names(
174    conn: &mut mysql_async::Conn,
175    db_name: &str,
176    table_name: &str,
177) -> Result<Vec<String>> {
178    // Validate table name
179    crate::jsonb::validate_table_name(table_name).context("Invalid table name for column query")?;
180
181    let query = r#"
182        SELECT COLUMN_NAME
183        FROM INFORMATION_SCHEMA.COLUMNS
184        WHERE TABLE_SCHEMA = ?
185        AND TABLE_NAME = ?
186        ORDER BY ORDINAL_POSITION
187    "#;
188
189    let columns: Vec<String> =
190        conn.exec(query, (db_name, table_name))
191            .await
192            .with_context(|| {
193                format!(
194                    "Failed to get column names for table '{}.{}'",
195                    db_name, table_name
196                )
197            })?;
198
199    Ok(columns)
200}
201
202/// Convert an entire MySQL table to JSONB format
203///
204/// Reads all rows from the table and converts them to (id, jsonb_data) tuples.
205/// The ID is extracted from a primary key or auto-generated.
206///
207/// # Arguments
208///
209/// * `conn` - MySQL connection
210/// * `db_name` - Database name
211/// * `table_name` - Table name
212///
213/// # Returns
214///
215/// Vector of (id, json_data) tuples ready for PostgreSQL JSONB storage
216///
217/// # Examples
218///
219/// ```no_run
220/// # use database_replicator::mysql::{connect_mysql, converter::convert_table_to_jsonb};
221/// # async fn example() -> anyhow::Result<()> {
222/// let mut conn = connect_mysql("mysql://localhost:3306/mydb").await?;
223/// let rows = convert_table_to_jsonb(&mut conn, "mydb", "users").await?;
224/// println!("Converted {} rows", rows.len());
225/// # Ok(())
226/// # }
227/// ```
228pub async fn convert_table_to_jsonb(
229    conn: &mut mysql_async::Conn,
230    db_name: &str,
231    table_name: &str,
232) -> Result<Vec<(String, JsonValue)>> {
233    // Validate table name
234    crate::jsonb::validate_table_name(table_name)
235        .context("Invalid table name for JSONB conversion")?;
236
237    tracing::info!(
238        "Converting MySQL table '{}.{}' to JSONB",
239        db_name,
240        table_name
241    );
242
243    // Get column names
244    let column_names = get_column_names(conn, db_name, table_name).await?;
245
246    if column_names.is_empty() {
247        tracing::warn!("Table '{}.{}' has no columns", db_name, table_name);
248        return Ok(vec![]);
249    }
250
251    // Read all rows
252    let rows = crate::mysql::reader::read_table_data(conn, db_name, table_name).await?;
253
254    let mut result = Vec::with_capacity(rows.len());
255    let mut id_counter = 1u64;
256
257    for row in rows {
258        // Convert row to JSON
259        let json_data = mysql_row_to_json(&row, &column_names)
260            .with_context(|| format!("Failed to convert row in table '{}'", table_name))?;
261
262        // Try to extract ID from common ID column names
263        let id = if let Some(id_val) = json_data.get("id") {
264            // Use 'id' column if exists
265            id_val.to_string().trim_matches('"').to_string()
266        } else if let Some(id_val) = json_data.get("Id") {
267            // Case insensitive check
268            id_val.to_string().trim_matches('"').to_string()
269        } else if let Some(id_val) = json_data.get("ID") {
270            id_val.to_string().trim_matches('"').to_string()
271        } else {
272            // Generate sequential ID
273            let generated_id = format!("generated_{}", id_counter);
274            id_counter += 1;
275            generated_id
276        };
277
278        result.push((id, json_data));
279    }
280
281    tracing::info!(
282        "Converted {} rows from table '{}.{}'",
283        result.len(),
284        db_name,
285        table_name
286    );
287
288    Ok(result)
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294
295    #[test]
296    fn test_convert_null() {
297        let value = Value::NULL;
298        let json = mysql_value_to_json(&value).unwrap();
299        assert_eq!(json, JsonValue::Null);
300    }
301
302    #[test]
303    fn test_convert_int() {
304        let value = Value::Int(42);
305        let json = mysql_value_to_json(&value).unwrap();
306        assert_eq!(json, serde_json::json!(42));
307    }
308
309    #[test]
310    fn test_convert_uint() {
311        let value = Value::UInt(42);
312        let json = mysql_value_to_json(&value).unwrap();
313        assert_eq!(json, serde_json::json!(42));
314    }
315
316    #[test]
317    fn test_convert_double() {
318        let value = Value::Double(123.456);
319        let json = mysql_value_to_json(&value).unwrap();
320        assert_eq!(json, serde_json::json!(123.456));
321    }
322
323    #[test]
324    fn test_convert_string_bytes() {
325        let value = Value::Bytes(b"Hello World".to_vec());
326        let json = mysql_value_to_json(&value).unwrap();
327        assert_eq!(json, JsonValue::String("Hello World".to_string()));
328    }
329
330    #[test]
331    fn test_convert_binary_bytes() {
332        let value = Value::Bytes(vec![0xFF, 0xFE, 0xFD]);
333        let json = mysql_value_to_json(&value).unwrap();
334        assert!(json.is_object());
335        assert_eq!(json["_type"], "binary");
336    }
337
338    #[test]
339    fn test_convert_datetime() {
340        let value = Value::Date(2024, 1, 15, 10, 30, 45, 123456);
341        let json = mysql_value_to_json(&value).unwrap();
342        assert!(json.is_object());
343        assert_eq!(json["_type"], "datetime");
344        assert_eq!(json["value"], "2024-01-15T10:30:45.123456Z");
345    }
346
347    #[test]
348    fn test_convert_time() {
349        let value = Value::Time(false, 1, 10, 30, 45, 123456);
350        let json = mysql_value_to_json(&value).unwrap();
351        assert!(json.is_object());
352        assert_eq!(json["_type"], "time");
353        assert!(json["value"].as_str().unwrap().contains("1d 10:30:45"));
354    }
355
356    #[test]
357    fn test_convert_non_finite_double() {
358        let value = Value::Double(f64::NAN);
359        let json = mysql_value_to_json(&value).unwrap();
360        assert_eq!(json, JsonValue::String("NaN".to_string()));
361    }
362}