bubble_db/
mysql.rs

1use crate::{DatabaseConfig, DatabaseConnection, DbResult};
2use async_trait::async_trait;
3use mysql_async::{Conn, prelude::Queryable};
4use std::collections::HashMap;
5use tokio::sync::Mutex;
6
7#[derive(Debug)]
8pub struct MySqlConnection {
9    conn: Mutex<Conn>,
10}
11
12impl MySqlConnection {
13    pub async fn connect(config: &DatabaseConfig) -> DbResult<Self> {
14        let conn = Conn::new(
15            mysql_async::Opts::from_url(&config.connection_string()).map_err(|e| e.to_string())?,
16        )
17        .await
18        .map_err(|e| e.to_string())?;
19
20        Ok(Self {
21            conn: Mutex::new(conn),
22        })
23    }
24}
25
26#[async_trait]
27impl DatabaseConnection for MySqlConnection {
28    async fn execute(&self, sql: &str) -> DbResult<u64> {
29        let mut conn = self.conn.lock().await;
30        conn.query_drop(sql).await.map_err(|e| e.to_string())?;
31        let result = conn
32            .query_iter("SELECT ROW_COUNT()")
33            .await
34            .map_err(|e| e.to_string())?;
35        let rows = result
36            .map_and_drop(|row| row)
37            .await
38            .map_err(|e| e.to_string())?;
39
40        if let Some(row) = rows.first() {
41            let affected: i64 = row.get(0).unwrap_or(0);
42            Ok(affected.max(0) as u64)
43        } else {
44            Ok(0)
45        }
46    }
47
48    async fn query(&self, sql: &str) -> DbResult<String> {
49        let mut conn = self.conn.lock().await;
50        let result = conn.query_iter(sql).await.map_err(|e| e.to_string())?;
51        let rows = result
52            .map_and_drop(|row| row)
53            .await
54            .map_err(|e| e.to_string())?;
55        let mut results = Vec::new();
56        for row in rows {
57            let mut map = HashMap::new();
58            for (i, column) in row.columns_ref().iter().enumerate() {
59                let name = column.name_str().to_string();
60                let opt_value: Option<mysql_async::Value> = row.get(i);
61                let value = match opt_value {
62                    Some(mysql_async::Value::Int(i)) => i.to_string(),
63                    Some(mysql_async::Value::UInt(u)) => u.to_string(),
64                    Some(mysql_async::Value::Float(f)) => f.to_string(),
65                    Some(mysql_async::Value::Double(d)) => d.to_string(),
66                    Some(mysql_async::Value::Bytes(bytes)) => {
67                        String::from_utf8_lossy(&bytes).to_string()
68                    }
69                    Some(mysql_async::Value::Date(
70                        year,
71                        month,
72                        day,
73                        hour,
74                        minute,
75                        second,
76                        micro,
77                    )) => {
78                        format!(
79                            "{}-{:02}-{:02} {:02}:{:02}:{:02}.{:06}",
80                            year as i32, month, day, hour, minute, second, micro
81                        )
82                    }
83                    Some(mysql_async::Value::Time(neg, days, hours, minutes, seconds, micros)) => {
84                        let total = (days as i64 * 86400
85                            + hours as i64 * 3600
86                            + minutes as i64 * 60
87                            + seconds as i64) as i64;
88                        let total = if neg { -total } else { total };
89                        format!(
90                            "{} days {}:{:02}:{:02}.{:06}",
91                            days, hours, minutes, seconds, micros
92                        )
93                    }
94                    None | Some(mysql_async::Value::NULL) => "".to_string(),
95                };
96                map.insert(name, value);
97            }
98            results.push(map);
99        }
100        serde_json::to_string(&results).map_err(|e| e.to_string())
101    }
102
103    async fn query_one(&self, sql: &str) -> DbResult<String> {
104        let mut conn = self.conn.lock().await;
105        let result = conn.query_iter(sql).await.map_err(|e| e.to_string())?;
106        let rows = result
107            .map_and_drop(|row| row)
108            .await
109            .map_err(|e| e.to_string())?;
110        if let Some(row) = rows.first() {
111            let mut map = HashMap::new();
112            for (i, column) in row.columns_ref().iter().enumerate() {
113                let name = column.name_str().to_string();
114                let opt_value: Option<mysql_async::Value> = row.get(i);
115                let value = match opt_value {
116                    Some(mysql_async::Value::Int(i)) => i.to_string(),
117                    Some(mysql_async::Value::UInt(u)) => u.to_string(),
118                    Some(mysql_async::Value::Float(f)) => f.to_string(),
119                    Some(mysql_async::Value::Double(d)) => d.to_string(),
120                    Some(mysql_async::Value::Bytes(bytes)) => {
121                        String::from_utf8_lossy(&bytes).to_string()
122                    }
123                    Some(mysql_async::Value::Date(
124                        year,
125                        month,
126                        day,
127                        hour,
128                        minute,
129                        second,
130                        micro,
131                    )) => {
132                        format!(
133                            "{}-{:02}-{:02} {:02}:{:02}:{:02}.{:06}",
134                            year as i32, month, day, hour, minute, second, micro
135                        )
136                    }
137                    Some(mysql_async::Value::Time(neg, days, hours, minutes, seconds, micros)) => {
138                        let total = (days as i64 * 86400
139                            + hours as i64 * 3600
140                            + minutes as i64 * 60
141                            + seconds as i64) as i64;
142                        let total = if neg { -total } else { total };
143                        format!(
144                            "{} days {}:{:02}:{:02}.{:06}",
145                            days, hours, minutes, seconds, micros
146                        )
147                    }
148                    None | Some(mysql_async::Value::NULL) => "".to_string(),
149                };
150                map.insert(name, value);
151            }
152            serde_json::to_string(&map).map_err(|e| e.to_string())
153        } else {
154            Err("No rows found".to_string())
155        }
156    }
157
158    async fn insert_batch(&self, table: &str, json_data: &str) -> DbResult<u64> {
159        let items: Vec<serde_json::Value> = serde_json::from_str(json_data)
160            .map_err(|e| format!("Failed to parse JSON data: {}", e))?;
161        if items.is_empty() {
162            return Ok(0);
163        }
164        let mut conn = self.conn.lock().await;
165        let mut count = 0;
166        conn.query_drop("START TRANSACTION")
167            .await
168            .map_err(|e| e.to_string())?;
169        for item in items {
170            let value = crate::to_sql_value(&item)?;
171            let sql = format!("INSERT INTO {} VALUES ({})", table, value);
172            conn.query_drop(&sql).await.map_err(|e| e.to_string())?;
173            count += 1;
174        }
175        conn.query_drop("COMMIT").await.map_err(|e| e.to_string())?;
176        Ok(count)
177    }
178}