sea_schema/postgres/discovery/
mod.rs

1//! To query & parse MySQL's INFORMATION_SCHEMA and construct a [`Schema`]
2
3use crate::debug_print;
4use crate::postgres::def::*;
5use crate::postgres::parser::{
6    parse_table_constraint_query_results, parse_unique_index_query_results,
7};
8use crate::postgres::query::{
9    ColumnQueryResult, EnumQueryResult, SchemaQueryBuilder, TableConstraintsQueryResult,
10    TableQueryResult, UniqueIndexQueryResult,
11};
12use crate::sqlx_types::SqlxError;
13use futures::future;
14use sea_query::{Alias, Iden, IntoIden, SeaRc};
15use std::collections::HashMap;
16
17mod executor;
18pub use executor::*;
19
20pub(crate) type EnumVariantMap = HashMap<String, Vec<String>>;
21
22pub struct SchemaDiscovery {
23    pub query: SchemaQueryBuilder,
24    pub executor: Executor,
25    pub schema: SeaRc<dyn Iden>,
26}
27
28impl SchemaDiscovery {
29    pub fn new<E>(executor: E, schema: &str) -> Self
30    where
31        E: IntoExecutor,
32    {
33        Self {
34            query: SchemaQueryBuilder::default(),
35            executor: executor.into_executor(),
36            schema: Alias::new(schema).into_iden(),
37        }
38    }
39
40    pub async fn discover(&self) -> Result<Schema, SqlxError> {
41        let enums: EnumVariantMap = self
42            .discover_enums()
43            .await?
44            .into_iter()
45            .map(|enum_def| (enum_def.typename, enum_def.values))
46            .collect();
47        let tables = future::try_join_all(
48            self.discover_tables()
49                .await?
50                .into_iter()
51                .map(|t| (self, t, &enums))
52                .map(Self::discover_table_static),
53        )
54        .await?;
55
56        Ok(Schema {
57            schema: self.schema.to_string(),
58            tables,
59        })
60    }
61
62    pub async fn discover_tables(&self) -> Result<Vec<TableInfo>, SqlxError> {
63        let rows = self
64            .executor
65            .fetch_all(self.query.query_tables(self.schema.clone()))
66            .await?;
67
68        let tables: Vec<TableInfo> = rows
69            .iter()
70            .map(|row| {
71                let result: TableQueryResult = row.into();
72                debug_print!("{:?}", result);
73                let table = result.parse();
74                debug_print!("{:?}", table);
75                table
76            })
77            .collect();
78
79        Ok(tables)
80    }
81
82    async fn discover_table_static(
83        params: (&Self, TableInfo, &EnumVariantMap),
84    ) -> Result<TableDef, SqlxError> {
85        let this = params.0;
86        let info = params.1;
87        let enums = params.2;
88        Self::discover_table(this, info, enums).await
89    }
90
91    pub async fn discover_table(
92        &self,
93        info: TableInfo,
94        enums: &EnumVariantMap,
95    ) -> Result<TableDef, SqlxError> {
96        let table = SeaRc::new(Alias::new(info.name.as_str()));
97        let columns = self
98            .discover_columns(self.schema.clone(), table.clone(), enums)
99            .await?;
100        let constraints = self
101            .discover_constraints(self.schema.clone(), table.clone())
102            .await?;
103        let (
104            check_constraints,
105            not_null_constraints,
106            primary_key_constraints,
107            reference_constraints,
108            exclusion_constraints,
109        ) = constraints.into_iter().fold(
110            (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()),
111            |mut acc, constraint| {
112                match constraint {
113                    Constraint::Check(check) => acc.0.push(check),
114                    Constraint::NotNull(not_null) => acc.1.push(not_null),
115                    Constraint::Unique(_) => (),
116                    Constraint::PrimaryKey(primary_key) => acc.2.push(primary_key),
117                    Constraint::References(references) => acc.3.push(references),
118                    Constraint::Exclusion(exclusion) => acc.4.push(exclusion),
119                }
120                acc
121            },
122        );
123
124        let unique_constraints = self
125            .discover_unique_indexes(self.schema.clone(), table.clone())
126            .await?;
127
128        Ok(TableDef {
129            info,
130            columns,
131            check_constraints,
132            not_null_constraints,
133            unique_constraints,
134            primary_key_constraints,
135            reference_constraints,
136            exclusion_constraints,
137        })
138    }
139
140    pub async fn discover_columns(
141        &self,
142        schema: SeaRc<dyn Iden>,
143        table: SeaRc<dyn Iden>,
144        enums: &EnumVariantMap,
145    ) -> Result<Vec<ColumnInfo>, SqlxError> {
146        let rows = self
147            .executor
148            .fetch_all(self.query.query_columns(schema.clone(), table.clone()))
149            .await?;
150
151        Ok(rows
152            .into_iter()
153            .map(|row| {
154                let result: ColumnQueryResult = (&row).into();
155                debug_print!("{:?}", result);
156                let column = result.parse(enums);
157                debug_print!("{:?}", column);
158                column
159            })
160            .collect())
161    }
162
163    pub async fn discover_constraints(
164        &self,
165        schema: SeaRc<dyn Iden>,
166        table: SeaRc<dyn Iden>,
167    ) -> Result<Vec<Constraint>, SqlxError> {
168        let rows = self
169            .executor
170            .fetch_all(
171                self.query
172                    .query_table_constraints(schema.clone(), table.clone()),
173            )
174            .await?;
175
176        let results = rows.into_iter().map(|row| {
177            let result: TableConstraintsQueryResult = (&row).into();
178            debug_print!("{:?}", result);
179            result
180        });
181
182        Ok(parse_table_constraint_query_results(Box::new(results))
183            .map(|index| {
184                debug_print!("{:?}", index);
185                index
186            })
187            .collect())
188    }
189
190    pub async fn discover_unique_indexes(
191        &self,
192        schema: SeaRc<dyn Iden>,
193        table: SeaRc<dyn Iden>,
194    ) -> Result<Vec<Unique>, SqlxError> {
195        let rows = self
196            .executor
197            .fetch_all(
198                self.query
199                    .query_table_unique_indexes(schema.clone(), table.clone()),
200            )
201            .await?;
202
203        let results = rows.into_iter().map(|row| {
204            let result: UniqueIndexQueryResult = (&row).into();
205            debug_print!("{:?}", result);
206            result
207        });
208
209        Ok(parse_unique_index_query_results(Box::new(results))
210            .map(|index| {
211                debug_print!("{:?}", index);
212                index
213            })
214            .collect())
215    }
216
217    pub async fn discover_enums(&self) -> Result<Vec<EnumDef>, SqlxError> {
218        let rows = self.executor.fetch_all(self.query.query_enums()).await?;
219
220        let enum_rows = rows.into_iter().map(|row| {
221            let result: EnumQueryResult = (&row).into();
222            debug_print!("{:?}", result);
223            result
224        });
225
226        let map = enum_rows.fold(
227            HashMap::new(),
228            |mut map: HashMap<String, Vec<String>>,
229             EnumQueryResult {
230                 typename,
231                 enumlabel,
232             }| {
233                if let Some(entry_exists) = map.get_mut(&typename) {
234                    entry_exists.push(enumlabel);
235                } else {
236                    map.insert(typename, vec![enumlabel]);
237                }
238                map
239            },
240        );
241
242        Ok(map
243            .into_iter()
244            .map(|(typename, values)| EnumDef { values, typename })
245            .collect())
246    }
247}