database_replicator/mysql/
converter.rs1use anyhow::{Context, Result};
5use mysql_async::{prelude::*, Row, Value};
6use serde_json::Value as JsonValue;
7
8pub fn mysql_value_to_json(value: &Value) -> Result<JsonValue> {
37 match value {
38 Value::NULL => Ok(JsonValue::Null),
39
40 Value::Int(i) => Ok(JsonValue::Number((*i).into())),
41 Value::UInt(u) => Ok(JsonValue::Number((*u).into())),
42
43 Value::Float(f) => {
44 if f.is_finite() {
45 serde_json::Number::from_f64(*f as f64)
46 .map(JsonValue::Number)
47 .ok_or_else(|| anyhow::anyhow!("Failed to convert float {} to JSON number", f))
48 } else {
49 Ok(JsonValue::String(f.to_string()))
51 }
52 }
53
54 Value::Double(d) => {
55 if d.is_finite() {
56 serde_json::Number::from_f64(*d)
57 .map(JsonValue::Number)
58 .ok_or_else(|| anyhow::anyhow!("Failed to convert double {} to JSON number", d))
59 } else {
60 Ok(JsonValue::String(d.to_string()))
62 }
63 }
64
65 Value::Bytes(b) => {
66 if let Ok(s) = String::from_utf8(b.clone()) {
68 Ok(JsonValue::String(s))
69 } else {
70 let encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, b);
72 Ok(serde_json::json!({
73 "_type": "binary",
74 "data": encoded
75 }))
76 }
77 }
78
79 Value::Date(year, month, day, hour, minute, second, micro) => {
80 let datetime_str = format!(
82 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:06}Z",
83 year, month, day, hour, minute, second, micro
84 );
85 Ok(serde_json::json!({
86 "_type": "datetime",
87 "value": datetime_str
88 }))
89 }
90
91 Value::Time(is_negative, days, hours, minutes, seconds, microseconds) => {
92 let sign = if *is_negative { "-" } else { "" };
94 let time_str = format!(
95 "{}{}d {:02}:{:02}:{:02}.{:06}",
96 sign, days, hours, minutes, seconds, microseconds
97 );
98 Ok(serde_json::json!({
99 "_type": "time",
100 "value": time_str
101 }))
102 }
103 }
104}
105
106pub fn mysql_row_to_json(row: &Row, column_names: &[String]) -> Result<JsonValue> {
131 let mut obj = serde_json::Map::new();
132
133 for (idx, col_name) in column_names.iter().enumerate() {
134 let value: Value = row
136 .get(idx)
137 .ok_or_else(|| anyhow::anyhow!("Failed to get column {} at index {}", col_name, idx))?;
138
139 let json_val = mysql_value_to_json(&value)
141 .with_context(|| format!("Failed to convert column '{}' to JSON", col_name))?;
142
143 obj.insert(col_name.clone(), json_val);
144 }
145
146 Ok(JsonValue::Object(obj))
147}
148
149pub async fn get_column_names(
174 conn: &mut mysql_async::Conn,
175 db_name: &str,
176 table_name: &str,
177) -> Result<Vec<String>> {
178 crate::jsonb::validate_table_name(table_name).context("Invalid table name for column query")?;
180
181 let query = r#"
182 SELECT COLUMN_NAME
183 FROM INFORMATION_SCHEMA.COLUMNS
184 WHERE TABLE_SCHEMA = ?
185 AND TABLE_NAME = ?
186 ORDER BY ORDINAL_POSITION
187 "#;
188
189 let columns: Vec<String> =
190 conn.exec(query, (db_name, table_name))
191 .await
192 .with_context(|| {
193 format!(
194 "Failed to get column names for table '{}.{}'",
195 db_name, table_name
196 )
197 })?;
198
199 Ok(columns)
200}
201
202pub async fn convert_table_to_jsonb(
229 conn: &mut mysql_async::Conn,
230 db_name: &str,
231 table_name: &str,
232) -> Result<Vec<(String, JsonValue)>> {
233 crate::jsonb::validate_table_name(table_name)
235 .context("Invalid table name for JSONB conversion")?;
236
237 tracing::info!(
238 "Converting MySQL table '{}.{}' to JSONB",
239 db_name,
240 table_name
241 );
242
243 let column_names = get_column_names(conn, db_name, table_name).await?;
245
246 if column_names.is_empty() {
247 tracing::warn!("Table '{}.{}' has no columns", db_name, table_name);
248 return Ok(vec![]);
249 }
250
251 let rows = crate::mysql::reader::read_table_data(conn, db_name, table_name).await?;
253
254 let mut result = Vec::with_capacity(rows.len());
255 let mut id_counter = 1u64;
256
257 for row in rows {
258 let json_data = mysql_row_to_json(&row, &column_names)
260 .with_context(|| format!("Failed to convert row in table '{}'", table_name))?;
261
262 let id = if let Some(id_val) = json_data.get("id") {
264 id_val.to_string().trim_matches('"').to_string()
266 } else if let Some(id_val) = json_data.get("Id") {
267 id_val.to_string().trim_matches('"').to_string()
269 } else if let Some(id_val) = json_data.get("ID") {
270 id_val.to_string().trim_matches('"').to_string()
271 } else {
272 let generated_id = format!("generated_{}", id_counter);
274 id_counter += 1;
275 generated_id
276 };
277
278 result.push((id, json_data));
279 }
280
281 tracing::info!(
282 "Converted {} rows from table '{}.{}'",
283 result.len(),
284 db_name,
285 table_name
286 );
287
288 Ok(result)
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294
295 #[test]
296 fn test_convert_null() {
297 let value = Value::NULL;
298 let json = mysql_value_to_json(&value).unwrap();
299 assert_eq!(json, JsonValue::Null);
300 }
301
302 #[test]
303 fn test_convert_int() {
304 let value = Value::Int(42);
305 let json = mysql_value_to_json(&value).unwrap();
306 assert_eq!(json, serde_json::json!(42));
307 }
308
309 #[test]
310 fn test_convert_uint() {
311 let value = Value::UInt(42);
312 let json = mysql_value_to_json(&value).unwrap();
313 assert_eq!(json, serde_json::json!(42));
314 }
315
316 #[test]
317 fn test_convert_double() {
318 let value = Value::Double(123.456);
319 let json = mysql_value_to_json(&value).unwrap();
320 assert_eq!(json, serde_json::json!(123.456));
321 }
322
323 #[test]
324 fn test_convert_string_bytes() {
325 let value = Value::Bytes(b"Hello World".to_vec());
326 let json = mysql_value_to_json(&value).unwrap();
327 assert_eq!(json, JsonValue::String("Hello World".to_string()));
328 }
329
330 #[test]
331 fn test_convert_binary_bytes() {
332 let value = Value::Bytes(vec![0xFF, 0xFE, 0xFD]);
333 let json = mysql_value_to_json(&value).unwrap();
334 assert!(json.is_object());
335 assert_eq!(json["_type"], "binary");
336 }
337
338 #[test]
339 fn test_convert_datetime() {
340 let value = Value::Date(2024, 1, 15, 10, 30, 45, 123456);
341 let json = mysql_value_to_json(&value).unwrap();
342 assert!(json.is_object());
343 assert_eq!(json["_type"], "datetime");
344 assert_eq!(json["value"], "2024-01-15T10:30:45.123456Z");
345 }
346
347 #[test]
348 fn test_convert_time() {
349 let value = Value::Time(false, 1, 10, 30, 45, 123456);
350 let json = mysql_value_to_json(&value).unwrap();
351 assert!(json.is_object());
352 assert_eq!(json["_type"], "time");
353 assert!(json["value"].as_str().unwrap().contains("1d 10:30:45"));
354 }
355
356 #[test]
357 fn test_convert_non_finite_double() {
358 let value = Value::Double(f64::NAN);
359 let json = mysql_value_to_json(&value).unwrap();
360 assert_eq!(json, JsonValue::String("NaN".to_string()));
361 }
362}