Skip to main content

reflect_db/
mysql.rs

1use async_trait::async_trait;
2use sqlx::{MySqlPool, Row};
3use crate::error::ReflectError;
4use crate::executor::{Executor, TableInfo};
5use crate::metadata::{Column, ForeignKey, Index, PrimaryKey, SqlType};
6
7pub struct MysqlExecutor {
8    pool: MySqlPool,
9}
10
11impl MysqlExecutor {
12    pub fn new(pool: MySqlPool) -> Self {
13        Self { pool }
14    }
15}
16
17#[async_trait]
18impl Executor for MysqlExecutor {
19    async fn fetch_tables(
20        &self,
21        schema: Option<&str>,
22        include_views: bool,
23    ) -> Result<Vec<TableInfo>, ReflectError> {
24        let database: Option<String> = if let Some(s) = schema {
25            Some(s.to_string())
26        } else {
27            let row = sqlx::query("SELECT DATABASE()").fetch_one(&self.pool).await?;
28            row.try_get(0)?
29        };
30        let schema_name = database.unwrap_or_else(|| "".to_string());
31
32        let mut query_str = "SELECT CAST(table_name AS CHAR), CAST(table_type AS CHAR) FROM information_schema.tables WHERE table_schema = ?".to_string();
33        if !include_views {
34            query_str.push_str(" AND table_type = 'BASE TABLE'");
35        }
36
37        let rows = sqlx::query(&query_str)
38            .bind(&schema_name)
39            .fetch_all(&self.pool)
40            .await?;
41
42        let mut infos = Vec::new();
43        for row in rows {
44            let name: String = row.try_get(0)?;
45            let ttype: String = row.try_get(1)?;
46            infos.push(TableInfo {
47                name,
48                is_view: ttype == "VIEW",
49            });
50        }
51        Ok(infos)
52    }
53
54    async fn fetch_columns(
55        &self,
56        table: &str,
57        schema: Option<&str>,
58    ) -> Result<Vec<Column>, ReflectError> {
59        let database: Option<String> = if let Some(s) = schema {
60            Some(s.to_string())
61        } else {
62            let row = sqlx::query("SELECT DATABASE()").fetch_one(&self.pool).await?;
63            row.try_get(0)?
64        };
65        let schema_name = database.unwrap_or_else(|| "".to_string());
66
67        let rows = sqlx::query(
68            "SELECT CAST(column_name AS CHAR), CAST(data_type AS CHAR), CAST(is_nullable AS CHAR), CAST(column_default AS CHAR), character_maximum_length 
69             FROM information_schema.columns 
70             WHERE table_schema = ? AND table_name = ? 
71             ORDER BY ordinal_position"
72        )
73        .bind(&schema_name)
74        .bind(table)
75        .fetch_all(&self.pool)
76        .await?;
77
78        let mut columns = Vec::new();
79        for row in rows {
80            let name: String = row.try_get(0)?;
81            let data_type_str: String = row.try_get(1)?;
82            let is_nullable_str: String = row.try_get(2)?;
83            
84            let default: Option<String> = match row.try_get::<Option<String>, _>(3) {
85                Ok(val) => val,
86                Err(_) => None,
87            };
88
89            // in MySQL character_maximum_length is usually u32/i64 type based on sqlx driver mapping
90            // we will fallback if we get a type error
91            let char_max: Option<i64> = row.try_get(4).unwrap_or(None);
92
93            let nullable = is_nullable_str == "YES";
94            let dt = data_type_str.to_uppercase();
95
96            let data_type = match dt.as_str() {
97                "INT" | "INTEGER" | "TINYINT" | "SMALLINT" | "MEDIUMINT" => SqlType::Integer,
98                "BIGINT" => SqlType::BigInt,
99                "FLOAT" => SqlType::Float,
100                "DOUBLE" | "DECIMAL" | "NUMERIC" => SqlType::Double,
101                "TEXT" | "LONGTEXT" | "MEDIUMTEXT" | "TINYTEXT" => SqlType::Text,
102                "VARCHAR" | "CHAR" => {
103                    SqlType::Varchar(char_max.map(|v| v as u32))
104                }
105                "DATE" => SqlType::Date,
106                "TIMESTAMP" | "DATETIME" => SqlType::Timestamp,
107                "JSON" => SqlType::Json,
108                _ => SqlType::Custom(dt.clone()),
109            };
110
111            columns.push(Column {
112                name,
113                data_type,
114                nullable,
115                default,
116            });
117        }
118
119        Ok(columns)
120    }
121
122    async fn fetch_primary_key(
123        &self,
124        table: &str,
125        schema: Option<&str>,
126    ) -> Result<Option<PrimaryKey>, ReflectError> {
127        let database: Option<String> = if let Some(s) = schema {
128            Some(s.to_string())
129        } else {
130            let row = sqlx::query("SELECT DATABASE()").fetch_one(&self.pool).await?;
131            row.try_get(0)?
132        };
133        let schema_name = database.unwrap_or_else(|| "".to_string());
134
135        let rows = sqlx::query(
136            "SELECT CAST(column_name AS CHAR) 
137             FROM information_schema.key_column_usage 
138             WHERE table_schema = ? AND table_name = ? AND constraint_name = 'PRIMARY'
139             ORDER BY ordinal_position"
140        )
141        .bind(&schema_name)
142        .bind(table)
143        .fetch_all(&self.pool)
144        .await?;
145
146        if rows.is_empty() {
147            return Ok(None);
148        }
149
150        let mut columns = Vec::new();
151        for row in rows {
152            let col: String = row.try_get(0)?;
153            columns.push(col);
154        }
155
156        Ok(Some(PrimaryKey { columns }))
157    }
158
159    async fn fetch_foreign_keys(
160        &self,
161        table: &str,
162        schema: Option<&str>,
163    ) -> Result<Vec<ForeignKey>, ReflectError> {
164        let database: Option<String> = if let Some(s) = schema {
165            Some(s.to_string())
166        } else {
167            let row = sqlx::query("SELECT DATABASE()").fetch_one(&self.pool).await?;
168            row.try_get(0)?
169        };
170        let schema_name = database.unwrap_or_else(|| "".to_string());
171        
172        let rows = sqlx::query(
173            "SELECT CAST(column_name AS CHAR), CAST(referenced_table_name AS CHAR), CAST(referenced_column_name AS CHAR) 
174             FROM information_schema.key_column_usage 
175             WHERE table_schema = ? AND table_name = ? AND referenced_table_name IS NOT NULL"
176        )
177        .bind(&schema_name)
178        .bind(table)
179        .fetch_all(&self.pool)
180        .await?;
181
182        let mut fks = Vec::new();
183        for row in rows {
184            let column: String = row.try_get(0)?;
185            let referenced_table: String = row.try_get(1)?;
186            let referenced_column: String = row.try_get(2)?;
187
188            fks.push(ForeignKey {
189                column,
190                referenced_table,
191                referenced_column,
192            });
193        }
194
195        Ok(fks)
196    }
197
198    async fn fetch_indexes(
199        &self,
200        table: &str,
201        schema: Option<&str>,
202    ) -> Result<Vec<Index>, ReflectError> {
203        let database: Option<String> = if let Some(s) = schema {
204            Some(s.to_string())
205        } else {
206            let row = sqlx::query("SELECT DATABASE()").fetch_one(&self.pool).await?;
207            row.try_get(0)?
208        };
209        let schema_name = database.unwrap_or_else(|| "".to_string());
210
211        let rows = sqlx::query(
212            "SELECT CAST(index_name AS CHAR), non_unique, CAST(column_name AS CHAR) 
213             FROM information_schema.statistics 
214             WHERE table_schema = ? AND table_name = ? AND index_name != 'PRIMARY'
215             ORDER BY index_name, seq_in_index"
216        )
217        .bind(&schema_name)
218        .bind(table)
219        .fetch_all(&self.pool)
220        .await?;
221
222        let mut idx_map: std::collections::HashMap<String, Index> = std::collections::HashMap::new();
223
224        for row in rows {
225            let idx_name: String = row.try_get(0)?;
226            let non_unique: i64 = row.try_get(1)?;
227            let is_unique = non_unique == 0;
228            let col_name: String = row.try_get(2)?;
229
230            let eg = idx_map.entry(idx_name.clone()).or_insert(Index {
231                name: idx_name,
232                columns: Vec::new(),
233                unique: is_unique,
234            });
235            eg.columns.push(col_name);
236        }
237
238        Ok(idx_map.into_values().collect())
239    }
240}