gluesql_core/executor/alter/
table.rs1use {
2 super::{AlterError, validate, validate_column_names},
3 crate::{
4 ast::{
5 ColumnDef, ColumnUniqueOption, ForeignKey, Query, SetExpr, TableFactor, ToSql, Values,
6 },
7 data::Schema,
8 executor::{evaluate_stateless, select::select},
9 prelude::{DataType, Value},
10 result::Result,
11 store::{GStore, GStoreMut},
12 },
13 futures::stream::TryStreamExt,
14 serde::Serialize,
15 std::fmt,
16};
17
18pub struct CreateTableOptions<'a> {
19 pub target_table_name: &'a str,
20 pub column_defs: Option<&'a [ColumnDef]>,
21 pub if_not_exists: bool,
22 pub source: &'a Option<Box<Query>>,
23 pub engine: &'a Option<String>,
24 pub foreign_keys: &'a Vec<ForeignKey>,
25 pub comment: &'a Option<String>,
26}
27
28pub async fn create_table<T: GStore + GStoreMut>(
29 storage: &mut T,
30 CreateTableOptions {
31 target_table_name,
32 column_defs,
33 if_not_exists,
34 source,
35 engine,
36 foreign_keys,
37 comment,
38 }: CreateTableOptions<'_>,
39) -> Result<()> {
40 let target_columns_defs = match source.as_deref() {
41 Some(Query { body, .. }) => match body {
42 SetExpr::Select(select_query) => match &select_query.from.relation {
43 TableFactor::Table { name, .. } => {
44 let schema = storage.fetch_schema(name).await?;
45 let Schema {
46 column_defs: source_column_defs,
47 ..
48 } = schema
49 .ok_or_else(|| AlterError::CtasSourceTableNotFound(name.to_owned()))?;
50
51 source_column_defs
52 }
53 TableFactor::Series { .. } => {
54 let column_def = ColumnDef {
55 name: "N".into(),
56 data_type: DataType::Int,
57 nullable: false,
58 default: None,
59 unique: None,
60 comment: None,
61 };
62
63 Some(vec![column_def])
64 }
65 _ => {
66 return Err(AlterError::Unreachable.into());
67 }
68 },
69 SetExpr::Values(Values(values_list)) => {
70 let first_len = values_list[0].len();
71 let mut column_types = vec![None; first_len];
72
73 for exprs in values_list {
74 for (i, expr) in exprs.iter().enumerate() {
75 if column_types[i].is_some() {
76 continue;
77 }
78
79 column_types[i] = evaluate_stateless(None, expr)
80 .await
81 .and_then(Value::try_from)
82 .map(|value| value.get_type())?;
83 }
84
85 if column_types.iter().all(Option::is_some) {
86 break;
87 }
88 }
89
90 let column_defs = column_types
91 .iter()
92 .map(|column_type| match column_type {
93 Some(column_type) => column_type.to_owned(),
94 None => DataType::Text,
95 })
96 .enumerate()
97 .map(|(i, data_type)| ColumnDef {
98 name: format!("column{}", i + 1),
99 data_type,
100 nullable: true,
101 default: None,
102 unique: None,
103 comment: None,
104 })
105 .collect::<Vec<_>>();
106
107 Some(column_defs)
108 }
109 },
110 None if column_defs.is_some() => column_defs.map(<[ColumnDef]>::to_vec),
111 None => None,
112 };
113
114 if let Some(column_defs) = target_columns_defs.as_deref() {
115 validate_column_names(column_defs)?;
116
117 for column_def in column_defs {
118 validate(column_def).await?;
119 }
120 }
121
122 for foreign_key in foreign_keys {
123 let ForeignKey {
124 referencing_column_name,
125 referenced_table_name,
126 referenced_column_name,
127 ..
128 } = foreign_key;
129
130 let column_defs = if referenced_table_name == target_table_name {
131 target_columns_defs.clone()
132 } else {
133 let referenced_schema = storage
134 .fetch_schema(referenced_table_name)
135 .await?
136 .ok_or_else(|| {
137 AlterError::ReferencedTableNotFound(referenced_table_name.to_owned())
138 })?;
139
140 referenced_schema.column_defs
141 };
142
143 let referenced_column_def = column_defs
144 .and_then(|column_defs| {
145 column_defs
146 .into_iter()
147 .find(|column_def| column_def.name == *referenced_column_name)
148 })
149 .ok_or_else(|| AlterError::ReferencedColumnNotFound(referenced_column_name.to_owned()))?
150 .to_owned();
151
152 let referencing_column_def = target_columns_defs
153 .as_deref()
154 .and_then(|column_defs| {
155 column_defs
156 .iter()
157 .find(|column_def| column_def.name == *referencing_column_name)
158 })
159 .ok_or_else(|| {
160 AlterError::ReferencingColumnNotFound(referencing_column_name.to_owned())
161 })?;
162
163 if referencing_column_def.data_type != referenced_column_def.data_type {
164 return Err(AlterError::ForeignKeyDataTypeMismatch {
165 referencing_column: referencing_column_name.to_owned(),
166 referencing_column_type: referencing_column_def.data_type.to_owned(),
167 referenced_column: referenced_column_name.to_owned(),
168 referenced_column_type: referenced_column_def.data_type.to_owned(),
169 }
170 .into());
171 }
172
173 if referenced_column_def.unique != Some(ColumnUniqueOption { is_primary: true }) {
174 return Err(AlterError::ReferencingNonPKColumn {
175 referenced_table: referenced_table_name.to_owned(),
176 referenced_column: referenced_column_name.to_owned(),
177 }
178 .into());
179 }
180 }
181
182 if storage.fetch_schema(target_table_name).await?.is_none() {
183 let schema = Schema {
184 table_name: target_table_name.to_owned(),
185 column_defs: target_columns_defs,
186 indexes: vec![],
187 engine: engine.clone(),
188 foreign_keys: foreign_keys.clone(),
189 comment: comment.clone(),
190 };
191
192 storage.insert_schema(&schema).await?;
193 } else if !if_not_exists {
194 return Err(AlterError::TableAlreadyExists(target_table_name.to_owned()).into());
195 }
196
197 match source {
198 Some(query) => {
199 let rows = select(storage, query, None)
200 .await?
201 .map_ok(Into::into)
202 .try_collect()
203 .await?;
204
205 storage
206 .append_data(target_table_name, rows)
207 .await
208 .map(|_| ())
209 }
210 None => Ok(()),
211 }
212}
213
214pub async fn drop_table<T: GStore + GStoreMut>(
215 storage: &mut T,
216 table_names: &[String],
217 if_exists: bool,
218 cascade: bool,
219) -> Result<usize> {
220 let mut n = 0;
221
222 for table_name in table_names {
223 let schema = storage.fetch_schema(table_name).await?;
224
225 match (schema, if_exists) {
226 (None, true) => {
227 continue;
228 }
229 (None, false) => {
230 return Err(AlterError::TableNotFound(table_name.to_owned()).into());
231 }
232 _ => {}
233 }
234
235 let referencings = storage.fetch_referencings(table_name).await?;
236
237 if !referencings.is_empty() && !cascade {
238 return Err(AlterError::CannotDropTableWithReferencing {
239 referenced_table_name: table_name.into(),
240 referencings,
241 }
242 .into());
243 }
244
245 for Referencing {
246 table_name,
247 foreign_key: ForeignKey { name, .. },
248 } in referencings
249 {
250 let mut schema = storage
251 .fetch_schema(&table_name)
252 .await?
253 .ok_or_else(|| AlterError::TableNotFound(table_name.to_owned()))?;
254 schema
255 .foreign_keys
256 .retain(|foreign_key| foreign_key.name != name);
257 storage.insert_schema(&schema).await?;
258 }
259 storage.delete_schema(table_name).await?;
260
261 n += 1;
262 }
263
264 Ok(n)
265}
266
267#[derive(Debug, PartialEq, Eq, Serialize)]
268pub struct Referencing {
269 pub table_name: String,
270 pub foreign_key: ForeignKey,
271}
272
273impl fmt::Display for Referencing {
274 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
275 write!(
276 f,
277 r#"{} on table "{}""#,
278 self.foreign_key.to_sql(),
279 self.table_name
280 )
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use {super::*, crate::ast::ReferentialAction};
287
288 #[test]
289 fn test_referencing_display() {
290 let referencing = Referencing {
291 table_name: "Referencing".to_owned(),
292 foreign_key: ForeignKey {
293 name: "FK_referenced_id-Referenced_id".to_owned(),
294 referencing_column_name: "referenced_id".to_owned(),
295 referenced_table_name: "Referenced".to_owned(),
296 referenced_column_name: "id".to_owned(),
297 on_delete: ReferentialAction::NoAction,
298 on_update: ReferentialAction::NoAction,
299 },
300 };
301
302 assert_eq!(
303 format!("{referencing}"),
304 r#"CONSTRAINT "FK_referenced_id-Referenced_id" FOREIGN KEY ("referenced_id") REFERENCES "Referenced" ("id") ON DELETE NO ACTION ON UPDATE NO ACTION on table "Referencing""#
305 );
306 }
307}