sea_schema/postgres/discovery/
mod.rs1use crate::debug_print;
4use crate::postgres::def::*;
5use crate::postgres::parser::{
6 parse_table_constraint_query_results, parse_unique_index_query_results,
7};
8use crate::postgres::query::{
9 ColumnQueryResult, EnumQueryResult, SchemaQueryBuilder, TableConstraintsQueryResult,
10 TableQueryResult, UniqueIndexQueryResult,
11};
12use crate::sqlx_types::SqlxError;
13use futures::future;
14use sea_query::{Alias, Iden, IntoIden, SeaRc};
15use std::collections::HashMap;
16
17mod executor;
18pub use executor::*;
19
20pub(crate) type EnumVariantMap = HashMap<String, Vec<String>>;
21
22pub struct SchemaDiscovery {
23 pub query: SchemaQueryBuilder,
24 pub executor: Executor,
25 pub schema: SeaRc<dyn Iden>,
26}
27
28impl SchemaDiscovery {
29 pub fn new<E>(executor: E, schema: &str) -> Self
30 where
31 E: IntoExecutor,
32 {
33 Self {
34 query: SchemaQueryBuilder::default(),
35 executor: executor.into_executor(),
36 schema: Alias::new(schema).into_iden(),
37 }
38 }
39
40 pub async fn discover(&self) -> Result<Schema, SqlxError> {
41 let enums: EnumVariantMap = self
42 .discover_enums()
43 .await?
44 .into_iter()
45 .map(|enum_def| (enum_def.typename, enum_def.values))
46 .collect();
47 let tables = future::try_join_all(
48 self.discover_tables()
49 .await?
50 .into_iter()
51 .map(|t| (self, t, &enums))
52 .map(Self::discover_table_static),
53 )
54 .await?;
55
56 Ok(Schema {
57 schema: self.schema.to_string(),
58 tables,
59 })
60 }
61
62 pub async fn discover_tables(&self) -> Result<Vec<TableInfo>, SqlxError> {
63 let rows = self
64 .executor
65 .fetch_all(self.query.query_tables(self.schema.clone()))
66 .await?;
67
68 let tables: Vec<TableInfo> = rows
69 .iter()
70 .map(|row| {
71 let result: TableQueryResult = row.into();
72 debug_print!("{:?}", result);
73 let table = result.parse();
74 debug_print!("{:?}", table);
75 table
76 })
77 .collect();
78
79 Ok(tables)
80 }
81
82 async fn discover_table_static(
83 params: (&Self, TableInfo, &EnumVariantMap),
84 ) -> Result<TableDef, SqlxError> {
85 let this = params.0;
86 let info = params.1;
87 let enums = params.2;
88 Self::discover_table(this, info, enums).await
89 }
90
91 pub async fn discover_table(
92 &self,
93 info: TableInfo,
94 enums: &EnumVariantMap,
95 ) -> Result<TableDef, SqlxError> {
96 let table = SeaRc::new(Alias::new(info.name.as_str()));
97 let columns = self
98 .discover_columns(self.schema.clone(), table.clone(), enums)
99 .await?;
100 let constraints = self
101 .discover_constraints(self.schema.clone(), table.clone())
102 .await?;
103 let (
104 check_constraints,
105 not_null_constraints,
106 primary_key_constraints,
107 reference_constraints,
108 exclusion_constraints,
109 ) = constraints.into_iter().fold(
110 (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()),
111 |mut acc, constraint| {
112 match constraint {
113 Constraint::Check(check) => acc.0.push(check),
114 Constraint::NotNull(not_null) => acc.1.push(not_null),
115 Constraint::Unique(_) => (),
116 Constraint::PrimaryKey(primary_key) => acc.2.push(primary_key),
117 Constraint::References(references) => acc.3.push(references),
118 Constraint::Exclusion(exclusion) => acc.4.push(exclusion),
119 }
120 acc
121 },
122 );
123
124 let unique_constraints = self
125 .discover_unique_indexes(self.schema.clone(), table.clone())
126 .await?;
127
128 Ok(TableDef {
129 info,
130 columns,
131 check_constraints,
132 not_null_constraints,
133 unique_constraints,
134 primary_key_constraints,
135 reference_constraints,
136 exclusion_constraints,
137 })
138 }
139
140 pub async fn discover_columns(
141 &self,
142 schema: SeaRc<dyn Iden>,
143 table: SeaRc<dyn Iden>,
144 enums: &EnumVariantMap,
145 ) -> Result<Vec<ColumnInfo>, SqlxError> {
146 let rows = self
147 .executor
148 .fetch_all(self.query.query_columns(schema.clone(), table.clone()))
149 .await?;
150
151 Ok(rows
152 .into_iter()
153 .map(|row| {
154 let result: ColumnQueryResult = (&row).into();
155 debug_print!("{:?}", result);
156 let column = result.parse(enums);
157 debug_print!("{:?}", column);
158 column
159 })
160 .collect())
161 }
162
163 pub async fn discover_constraints(
164 &self,
165 schema: SeaRc<dyn Iden>,
166 table: SeaRc<dyn Iden>,
167 ) -> Result<Vec<Constraint>, SqlxError> {
168 let rows = self
169 .executor
170 .fetch_all(
171 self.query
172 .query_table_constraints(schema.clone(), table.clone()),
173 )
174 .await?;
175
176 let results = rows.into_iter().map(|row| {
177 let result: TableConstraintsQueryResult = (&row).into();
178 debug_print!("{:?}", result);
179 result
180 });
181
182 Ok(parse_table_constraint_query_results(Box::new(results))
183 .map(|index| {
184 debug_print!("{:?}", index);
185 index
186 })
187 .collect())
188 }
189
190 pub async fn discover_unique_indexes(
191 &self,
192 schema: SeaRc<dyn Iden>,
193 table: SeaRc<dyn Iden>,
194 ) -> Result<Vec<Unique>, SqlxError> {
195 let rows = self
196 .executor
197 .fetch_all(
198 self.query
199 .query_table_unique_indexes(schema.clone(), table.clone()),
200 )
201 .await?;
202
203 let results = rows.into_iter().map(|row| {
204 let result: UniqueIndexQueryResult = (&row).into();
205 debug_print!("{:?}", result);
206 result
207 });
208
209 Ok(parse_unique_index_query_results(Box::new(results))
210 .map(|index| {
211 debug_print!("{:?}", index);
212 index
213 })
214 .collect())
215 }
216
217 pub async fn discover_enums(&self) -> Result<Vec<EnumDef>, SqlxError> {
218 let rows = self.executor.fetch_all(self.query.query_enums()).await?;
219
220 let enum_rows = rows.into_iter().map(|row| {
221 let result: EnumQueryResult = (&row).into();
222 debug_print!("{:?}", result);
223 result
224 });
225
226 let map = enum_rows.fold(
227 HashMap::new(),
228 |mut map: HashMap<String, Vec<String>>,
229 EnumQueryResult {
230 typename,
231 enumlabel,
232 }| {
233 if let Some(entry_exists) = map.get_mut(&typename) {
234 entry_exists.push(enumlabel);
235 } else {
236 map.insert(typename, vec![enumlabel]);
237 }
238 map
239 },
240 );
241
242 Ok(map
243 .into_iter()
244 .map(|(typename, values)| EnumDef { values, typename })
245 .collect())
246 }
247}