1use async_trait::async_trait;
2use sqlx::{MySqlPool, Row};
3use crate::error::ReflectError;
4use crate::executor::{Executor, TableInfo};
5use crate::metadata::{Column, ForeignKey, Index, PrimaryKey, SqlType};
6
7pub struct MysqlExecutor {
8 pool: MySqlPool,
9}
10
11impl MysqlExecutor {
12 pub fn new(pool: MySqlPool) -> Self {
13 Self { pool }
14 }
15}
16
17#[async_trait]
18impl Executor for MysqlExecutor {
19 async fn fetch_tables(
20 &self,
21 schema: Option<&str>,
22 include_views: bool,
23 ) -> Result<Vec<TableInfo>, ReflectError> {
24 let database: Option<String> = if let Some(s) = schema {
25 Some(s.to_string())
26 } else {
27 let row = sqlx::query("SELECT DATABASE()").fetch_one(&self.pool).await?;
28 row.try_get(0)?
29 };
30 let schema_name = database.unwrap_or_else(|| "".to_string());
31
32 let mut query_str = "SELECT CAST(table_name AS CHAR), CAST(table_type AS CHAR) FROM information_schema.tables WHERE table_schema = ?".to_string();
33 if !include_views {
34 query_str.push_str(" AND table_type = 'BASE TABLE'");
35 }
36
37 let rows = sqlx::query(&query_str)
38 .bind(&schema_name)
39 .fetch_all(&self.pool)
40 .await?;
41
42 let mut infos = Vec::new();
43 for row in rows {
44 let name: String = row.try_get(0)?;
45 let ttype: String = row.try_get(1)?;
46 infos.push(TableInfo {
47 name,
48 is_view: ttype == "VIEW",
49 });
50 }
51 Ok(infos)
52 }
53
54 async fn fetch_columns(
55 &self,
56 table: &str,
57 schema: Option<&str>,
58 ) -> Result<Vec<Column>, ReflectError> {
59 let database: Option<String> = if let Some(s) = schema {
60 Some(s.to_string())
61 } else {
62 let row = sqlx::query("SELECT DATABASE()").fetch_one(&self.pool).await?;
63 row.try_get(0)?
64 };
65 let schema_name = database.unwrap_or_else(|| "".to_string());
66
67 let rows = sqlx::query(
68 "SELECT CAST(column_name AS CHAR), CAST(data_type AS CHAR), CAST(is_nullable AS CHAR), CAST(column_default AS CHAR), character_maximum_length
69 FROM information_schema.columns
70 WHERE table_schema = ? AND table_name = ?
71 ORDER BY ordinal_position"
72 )
73 .bind(&schema_name)
74 .bind(table)
75 .fetch_all(&self.pool)
76 .await?;
77
78 let mut columns = Vec::new();
79 for row in rows {
80 let name: String = row.try_get(0)?;
81 let data_type_str: String = row.try_get(1)?;
82 let is_nullable_str: String = row.try_get(2)?;
83
84 let default: Option<String> = match row.try_get::<Option<String>, _>(3) {
85 Ok(val) => val,
86 Err(_) => None,
87 };
88
89 let char_max: Option<i64> = row.try_get(4).unwrap_or(None);
92
93 let nullable = is_nullable_str == "YES";
94 let dt = data_type_str.to_uppercase();
95
96 let data_type = match dt.as_str() {
97 "INT" | "INTEGER" | "TINYINT" | "SMALLINT" | "MEDIUMINT" => SqlType::Integer,
98 "BIGINT" => SqlType::BigInt,
99 "FLOAT" => SqlType::Float,
100 "DOUBLE" | "DECIMAL" | "NUMERIC" => SqlType::Double,
101 "TEXT" | "LONGTEXT" | "MEDIUMTEXT" | "TINYTEXT" => SqlType::Text,
102 "VARCHAR" | "CHAR" => {
103 SqlType::Varchar(char_max.map(|v| v as u32))
104 }
105 "DATE" => SqlType::Date,
106 "TIMESTAMP" | "DATETIME" => SqlType::Timestamp,
107 "JSON" => SqlType::Json,
108 _ => SqlType::Custom(dt.clone()),
109 };
110
111 columns.push(Column {
112 name,
113 data_type,
114 nullable,
115 default,
116 });
117 }
118
119 Ok(columns)
120 }
121
122 async fn fetch_primary_key(
123 &self,
124 table: &str,
125 schema: Option<&str>,
126 ) -> Result<Option<PrimaryKey>, ReflectError> {
127 let database: Option<String> = if let Some(s) = schema {
128 Some(s.to_string())
129 } else {
130 let row = sqlx::query("SELECT DATABASE()").fetch_one(&self.pool).await?;
131 row.try_get(0)?
132 };
133 let schema_name = database.unwrap_or_else(|| "".to_string());
134
135 let rows = sqlx::query(
136 "SELECT CAST(column_name AS CHAR)
137 FROM information_schema.key_column_usage
138 WHERE table_schema = ? AND table_name = ? AND constraint_name = 'PRIMARY'
139 ORDER BY ordinal_position"
140 )
141 .bind(&schema_name)
142 .bind(table)
143 .fetch_all(&self.pool)
144 .await?;
145
146 if rows.is_empty() {
147 return Ok(None);
148 }
149
150 let mut columns = Vec::new();
151 for row in rows {
152 let col: String = row.try_get(0)?;
153 columns.push(col);
154 }
155
156 Ok(Some(PrimaryKey { columns }))
157 }
158
159 async fn fetch_foreign_keys(
160 &self,
161 table: &str,
162 schema: Option<&str>,
163 ) -> Result<Vec<ForeignKey>, ReflectError> {
164 let database: Option<String> = if let Some(s) = schema {
165 Some(s.to_string())
166 } else {
167 let row = sqlx::query("SELECT DATABASE()").fetch_one(&self.pool).await?;
168 row.try_get(0)?
169 };
170 let schema_name = database.unwrap_or_else(|| "".to_string());
171
172 let rows = sqlx::query(
173 "SELECT CAST(column_name AS CHAR), CAST(referenced_table_name AS CHAR), CAST(referenced_column_name AS CHAR)
174 FROM information_schema.key_column_usage
175 WHERE table_schema = ? AND table_name = ? AND referenced_table_name IS NOT NULL"
176 )
177 .bind(&schema_name)
178 .bind(table)
179 .fetch_all(&self.pool)
180 .await?;
181
182 let mut fks = Vec::new();
183 for row in rows {
184 let column: String = row.try_get(0)?;
185 let referenced_table: String = row.try_get(1)?;
186 let referenced_column: String = row.try_get(2)?;
187
188 fks.push(ForeignKey {
189 column,
190 referenced_table,
191 referenced_column,
192 });
193 }
194
195 Ok(fks)
196 }
197
198 async fn fetch_indexes(
199 &self,
200 table: &str,
201 schema: Option<&str>,
202 ) -> Result<Vec<Index>, ReflectError> {
203 let database: Option<String> = if let Some(s) = schema {
204 Some(s.to_string())
205 } else {
206 let row = sqlx::query("SELECT DATABASE()").fetch_one(&self.pool).await?;
207 row.try_get(0)?
208 };
209 let schema_name = database.unwrap_or_else(|| "".to_string());
210
211 let rows = sqlx::query(
212 "SELECT CAST(index_name AS CHAR), non_unique, CAST(column_name AS CHAR)
213 FROM information_schema.statistics
214 WHERE table_schema = ? AND table_name = ? AND index_name != 'PRIMARY'
215 ORDER BY index_name, seq_in_index"
216 )
217 .bind(&schema_name)
218 .bind(table)
219 .fetch_all(&self.pool)
220 .await?;
221
222 let mut idx_map: std::collections::HashMap<String, Index> = std::collections::HashMap::new();
223
224 for row in rows {
225 let idx_name: String = row.try_get(0)?;
226 let non_unique: i64 = row.try_get(1)?;
227 let is_unique = non_unique == 0;
228 let col_name: String = row.try_get(2)?;
229
230 let eg = idx_map.entry(idx_name.clone()).or_insert(Index {
231 name: idx_name,
232 columns: Vec::new(),
233 unique: is_unique,
234 });
235 eg.columns.push(col_name);
236 }
237
238 Ok(idx_map.into_values().collect())
239 }
240}