sea_schema/mysql/discovery/
mod.rs

1//! To query & parse MySQL's INFORMATION_SCHEMA and construct a [`Schema`]
2
3use crate::debug_print;
4use crate::mysql::def::*;
5use crate::mysql::parser::{parse_foreign_key_query_results, parse_index_query_results};
6use crate::mysql::query::{
7    ColumnQueryResult, ForeignKeyQueryResult, IndexQueryResult, SchemaQueryBuilder,
8    TableQueryResult, VersionQueryResult,
9};
10use crate::sqlx_types::SqlxError;
11use futures::future;
12use sea_query::{Alias, Iden, IntoIden, SeaRc};
13
14mod executor;
15pub use executor::*;
16
17pub struct SchemaDiscovery {
18    pub query: SchemaQueryBuilder,
19    pub executor: Executor,
20    pub schema: SeaRc<dyn Iden>,
21}
22
23impl SchemaDiscovery {
24    pub fn new<E>(executor: E, schema: &str) -> Self
25    where
26        E: IntoExecutor,
27    {
28        Self {
29            query: SchemaQueryBuilder::default(),
30            executor: executor.into_executor(),
31            schema: Alias::new(schema).into_iden(),
32        }
33    }
34
35    pub async fn discover(mut self) -> Result<Schema, SqlxError> {
36        self.query = SchemaQueryBuilder::new(self.discover_system().await?);
37        let tables = self.discover_tables().await?;
38        let tables = future::try_join_all(
39            tables
40                .into_iter()
41                .map(|t| (&self, t))
42                .map(Self::discover_table_static),
43        )
44        .await?;
45
46        Ok(Schema {
47            schema: self.schema.to_string(),
48            system: self.query.system,
49            tables,
50        })
51    }
52
53    pub async fn discover_system(&mut self) -> Result<SystemInfo, SqlxError> {
54        let rows = self.executor.fetch_all(self.query.query_version()).await?;
55
56        #[allow(clippy::never_loop)]
57        for row in rows.iter() {
58            let result: VersionQueryResult = row.into();
59            debug_print!("{:?}", result);
60            let version = result.parse();
61            debug_print!("{:?}", version);
62            return Ok(version);
63        }
64        Err(SqlxError::RowNotFound)
65    }
66
67    pub async fn discover_tables(&mut self) -> Result<Vec<TableInfo>, SqlxError> {
68        let rows = self
69            .executor
70            .fetch_all(self.query.query_tables(self.schema.clone()))
71            .await?;
72
73        let tables: Vec<TableInfo> = rows
74            .iter()
75            .map(|row| {
76                let result: TableQueryResult = row.into();
77                debug_print!("{:?}", result);
78                let table = result.parse();
79                debug_print!("{:?}", table);
80                table
81            })
82            .collect();
83
84        Ok(tables)
85    }
86
87    async fn discover_table_static(params: (&Self, TableInfo)) -> Result<TableDef, SqlxError> {
88        let this = params.0;
89        let info = params.1;
90        Self::discover_table(this, info).await
91    }
92
93    pub async fn discover_table(&self, info: TableInfo) -> Result<TableDef, SqlxError> {
94        let table = SeaRc::new(Alias::new(info.name.as_str()));
95        let columns = self
96            .discover_columns(self.schema.clone(), table.clone(), &self.query.system)
97            .await?;
98        let indexes = self
99            .discover_indexes(self.schema.clone(), table.clone())
100            .await?;
101        let foreign_keys = self
102            .discover_foreign_keys(self.schema.clone(), table.clone())
103            .await?;
104
105        Ok(TableDef {
106            info,
107            columns,
108            indexes,
109            foreign_keys,
110        })
111    }
112
113    pub async fn discover_columns(
114        &self,
115        schema: SeaRc<dyn Iden>,
116        table: SeaRc<dyn Iden>,
117        system: &SystemInfo,
118    ) -> Result<Vec<ColumnInfo>, SqlxError> {
119        let rows = self
120            .executor
121            .fetch_all(self.query.query_columns(schema.clone(), table.clone()))
122            .await?;
123
124        let columns = rows
125            .iter()
126            .map(|row| {
127                let result: ColumnQueryResult = row.into();
128                debug_print!("{:?}", result);
129                let column = result.parse(system);
130                debug_print!("{:?}", column);
131                column
132            })
133            .collect::<Vec<_>>();
134
135        Ok(columns)
136    }
137
138    pub async fn discover_indexes(
139        &self,
140        schema: SeaRc<dyn Iden>,
141        table: SeaRc<dyn Iden>,
142    ) -> Result<Vec<IndexInfo>, SqlxError> {
143        let rows = self
144            .executor
145            .fetch_all(self.query.query_indexes(schema.clone(), table.clone()))
146            .await?;
147
148        let results = rows.into_iter().map(|row| {
149            let result: IndexQueryResult = (&row).into();
150            debug_print!("{:?}", result);
151            result
152        });
153
154        Ok(parse_index_query_results(Box::new(results))
155            .map(|index| {
156                debug_print!("{:?}", index);
157                index
158            })
159            .collect())
160    }
161
162    pub async fn discover_foreign_keys(
163        &self,
164        schema: SeaRc<dyn Iden>,
165        table: SeaRc<dyn Iden>,
166    ) -> Result<Vec<ForeignKeyInfo>, SqlxError> {
167        let rows = self
168            .executor
169            .fetch_all(self.query.query_foreign_key(schema.clone(), table.clone()))
170            .await?;
171
172        let results = rows.into_iter().map(|row| {
173            let result: ForeignKeyQueryResult = (&row).into();
174            debug_print!("{:?}", result);
175            result
176        });
177
178        Ok(parse_foreign_key_query_results(Box::new(results))
179            .map(|index| {
180                debug_print!("{:?}", index);
181                index
182            })
183            .collect())
184    }
185}