1use crate::{DatabaseConfig, DatabaseConnection, DbResult};
2use async_trait::async_trait;
3use mysql_async::{Conn, prelude::Queryable};
4use std::collections::HashMap;
5use tokio::sync::Mutex;
6
7#[derive(Debug)]
8pub struct MySqlConnection {
9 conn: Mutex<Conn>,
10}
11
12impl MySqlConnection {
13 pub async fn connect(config: &DatabaseConfig) -> DbResult<Self> {
14 let conn = Conn::new(
15 mysql_async::Opts::from_url(&config.connection_string()).map_err(|e| e.to_string())?,
16 )
17 .await
18 .map_err(|e| e.to_string())?;
19
20 Ok(Self {
21 conn: Mutex::new(conn),
22 })
23 }
24}
25
26#[async_trait]
27impl DatabaseConnection for MySqlConnection {
28 async fn execute(&self, sql: &str) -> DbResult<u64> {
29 let mut conn = self.conn.lock().await;
30 conn.query_drop(sql).await.map_err(|e| e.to_string())?;
31 let result = conn
32 .query_iter("SELECT ROW_COUNT()")
33 .await
34 .map_err(|e| e.to_string())?;
35 let rows = result
36 .map_and_drop(|row| row)
37 .await
38 .map_err(|e| e.to_string())?;
39
40 if let Some(row) = rows.first() {
41 let affected: i64 = row.get(0).unwrap_or(0);
42 Ok(affected.max(0) as u64)
43 } else {
44 Ok(0)
45 }
46 }
47
48 async fn query(&self, sql: &str) -> DbResult<String> {
49 let mut conn = self.conn.lock().await;
50 let result = conn.query_iter(sql).await.map_err(|e| e.to_string())?;
51 let rows = result
52 .map_and_drop(|row| row)
53 .await
54 .map_err(|e| e.to_string())?;
55 let mut results = Vec::new();
56 for row in rows {
57 let mut map = HashMap::new();
58 for (i, column) in row.columns_ref().iter().enumerate() {
59 let name = column.name_str().to_string();
60 let opt_value: Option<mysql_async::Value> = row.get(i);
61 let value = match opt_value {
62 Some(mysql_async::Value::Int(i)) => i.to_string(),
63 Some(mysql_async::Value::UInt(u)) => u.to_string(),
64 Some(mysql_async::Value::Float(f)) => f.to_string(),
65 Some(mysql_async::Value::Double(d)) => d.to_string(),
66 Some(mysql_async::Value::Bytes(bytes)) => {
67 String::from_utf8_lossy(&bytes).to_string()
68 }
69 Some(mysql_async::Value::Date(
70 year,
71 month,
72 day,
73 hour,
74 minute,
75 second,
76 micro,
77 )) => {
78 format!(
79 "{}-{:02}-{:02} {:02}:{:02}:{:02}.{:06}",
80 year as i32, month, day, hour, minute, second, micro
81 )
82 }
83 Some(mysql_async::Value::Time(neg, days, hours, minutes, seconds, micros)) => {
84 let total = (days as i64 * 86400
85 + hours as i64 * 3600
86 + minutes as i64 * 60
87 + seconds as i64) as i64;
88 let total = if neg { -total } else { total };
89 format!(
90 "{} days {}:{:02}:{:02}.{:06}",
91 days, hours, minutes, seconds, micros
92 )
93 }
94 None | Some(mysql_async::Value::NULL) => "".to_string(),
95 };
96 map.insert(name, value);
97 }
98 results.push(map);
99 }
100 serde_json::to_string(&results).map_err(|e| e.to_string())
101 }
102
103 async fn query_one(&self, sql: &str) -> DbResult<String> {
104 let mut conn = self.conn.lock().await;
105 let result = conn.query_iter(sql).await.map_err(|e| e.to_string())?;
106 let rows = result
107 .map_and_drop(|row| row)
108 .await
109 .map_err(|e| e.to_string())?;
110 if let Some(row) = rows.first() {
111 let mut map = HashMap::new();
112 for (i, column) in row.columns_ref().iter().enumerate() {
113 let name = column.name_str().to_string();
114 let opt_value: Option<mysql_async::Value> = row.get(i);
115 let value = match opt_value {
116 Some(mysql_async::Value::Int(i)) => i.to_string(),
117 Some(mysql_async::Value::UInt(u)) => u.to_string(),
118 Some(mysql_async::Value::Float(f)) => f.to_string(),
119 Some(mysql_async::Value::Double(d)) => d.to_string(),
120 Some(mysql_async::Value::Bytes(bytes)) => {
121 String::from_utf8_lossy(&bytes).to_string()
122 }
123 Some(mysql_async::Value::Date(
124 year,
125 month,
126 day,
127 hour,
128 minute,
129 second,
130 micro,
131 )) => {
132 format!(
133 "{}-{:02}-{:02} {:02}:{:02}:{:02}.{:06}",
134 year as i32, month, day, hour, minute, second, micro
135 )
136 }
137 Some(mysql_async::Value::Time(neg, days, hours, minutes, seconds, micros)) => {
138 let total = (days as i64 * 86400
139 + hours as i64 * 3600
140 + minutes as i64 * 60
141 + seconds as i64) as i64;
142 let total = if neg { -total } else { total };
143 format!(
144 "{} days {}:{:02}:{:02}.{:06}",
145 days, hours, minutes, seconds, micros
146 )
147 }
148 None | Some(mysql_async::Value::NULL) => "".to_string(),
149 };
150 map.insert(name, value);
151 }
152 serde_json::to_string(&map).map_err(|e| e.to_string())
153 } else {
154 Err("No rows found".to_string())
155 }
156 }
157
158 async fn insert_batch(&self, table: &str, json_data: &str) -> DbResult<u64> {
159 let items: Vec<serde_json::Value> = serde_json::from_str(json_data)
160 .map_err(|e| format!("Failed to parse JSON data: {}", e))?;
161 if items.is_empty() {
162 return Ok(0);
163 }
164 let mut conn = self.conn.lock().await;
165 let mut count = 0;
166 conn.query_drop("START TRANSACTION")
167 .await
168 .map_err(|e| e.to_string())?;
169 for item in items {
170 let value = crate::to_sql_value(&item)?;
171 let sql = format!("INSERT INTO {} VALUES ({})", table, value);
172 conn.query_drop(&sql).await.map_err(|e| e.to_string())?;
173 count += 1;
174 }
175 conn.query_drop("COMMIT").await.map_err(|e| e.to_string())?;
176 Ok(count)
177 }
178}