gluesql_core/executor/alter/
table.rs1use {
2 super::{AlterError, validate, validate_column_names},
3 crate::{
4 ast::{
5 ColumnDef, ColumnUniqueOption, ForeignKey, Query, SetExpr, TableFactor, ToSql, Values,
6 },
7 data::Schema,
8 executor::{evaluate_stateless, select::select},
9 prelude::{DataType, Value},
10 result::Result,
11 store::{GStore, GStoreMut},
12 },
13 futures::stream::TryStreamExt,
14 serde::Serialize,
15 std::fmt,
16};
17
18pub struct CreateTableOptions<'a> {
19 pub target_table_name: &'a str,
20 pub column_defs: Option<&'a [ColumnDef]>,
21 pub if_not_exists: bool,
22 pub source: &'a Option<Box<Query>>,
23 pub engine: &'a Option<String>,
24 pub foreign_keys: &'a Vec<ForeignKey>,
25 pub comment: &'a Option<String>,
26}
27
28pub async fn create_table<T: GStore + GStoreMut>(
29 storage: &mut T,
30 CreateTableOptions {
31 target_table_name,
32 column_defs,
33 if_not_exists,
34 source,
35 engine,
36 foreign_keys,
37 comment,
38 }: CreateTableOptions<'_>,
39) -> Result<()> {
40 let target_columns_defs = match source.as_deref() {
41 Some(Query { body, .. }) => match body {
42 SetExpr::Select(select_query) => match &select_query.from.relation {
43 TableFactor::Table { name, .. } => {
44 let schema = storage.fetch_schema(name).await?;
45 let Schema {
46 column_defs: source_column_defs,
47 ..
48 } = schema
49 .ok_or_else(|| AlterError::CtasSourceTableNotFound(name.to_owned()))?;
50
51 source_column_defs
52 }
53 TableFactor::Series { .. } => {
54 let column_def = ColumnDef {
55 name: "N".into(),
56 data_type: DataType::Int,
57 nullable: false,
58 default: None,
59 unique: None,
60 comment: None,
61 };
62
63 Some(vec![column_def])
64 }
65 _ => {
66 return Err(AlterError::Unreachable.into());
67 }
68 },
69 SetExpr::Values(Values(values_list)) => {
70 let first_len = values_list[0].len();
71 let mut column_types = vec![None; first_len];
72
73 for exprs in values_list {
74 for (i, expr) in exprs.iter().enumerate() {
75 if column_types[i].is_some() {
76 continue;
77 }
78
79 column_types[i] = evaluate_stateless(None, expr)
80 .await
81 .and_then(Value::try_from)
82 .map(|value| value.get_type())?;
83 }
84
85 if column_types.iter().all(Option::is_some) {
86 break;
87 }
88 }
89
90 let column_defs = column_types
91 .iter()
92 .map(|column_type| match column_type {
93 Some(column_type) => column_type.to_owned(),
94 None => DataType::Text,
95 })
96 .enumerate()
97 .map(|(i, data_type)| ColumnDef {
98 name: format!("column{}", i + 1),
99 data_type,
100 nullable: true,
101 default: None,
102 unique: None,
103 comment: None,
104 })
105 .collect::<Vec<_>>();
106
107 Some(column_defs)
108 }
109 },
110 None if column_defs.is_some() => column_defs.map(<[ColumnDef]>::to_vec),
111 None => None,
112 };
113
114 if let Some(column_defs) = target_columns_defs.as_deref() {
115 validate_column_names(column_defs)?;
116
117 for column_def in column_defs {
118 validate(column_def).await?;
119 }
120 }
121
122 for foreign_key in foreign_keys {
123 let ForeignKey {
124 referencing_column_name,
125 referenced_table_name,
126 referenced_column_name,
127 ..
128 } = foreign_key;
129
130 let column_defs = if referenced_table_name == target_table_name {
131 target_columns_defs.clone()
132 } else {
133 let referenced_schema = storage
134 .fetch_schema(referenced_table_name)
135 .await?
136 .ok_or_else(|| {
137 AlterError::ReferencedTableNotFound(referenced_table_name.to_owned())
138 })?;
139
140 referenced_schema.column_defs
141 };
142
143 let referenced_column_def = column_defs
144 .and_then(|column_defs| {
145 column_defs
146 .into_iter()
147 .find(|column_def| column_def.name == *referenced_column_name)
148 })
149 .ok_or_else(|| AlterError::ReferencedColumnNotFound(referenced_column_name.to_owned()))?
150 .clone();
151
152 let referencing_column_def = target_columns_defs
153 .as_deref()
154 .and_then(|column_defs| {
155 column_defs
156 .iter()
157 .find(|column_def| column_def.name == *referencing_column_name)
158 })
159 .ok_or_else(|| {
160 AlterError::ReferencingColumnNotFound(referencing_column_name.to_owned())
161 })?;
162
163 if referencing_column_def.data_type != referenced_column_def.data_type {
164 return Err(AlterError::ForeignKeyDataTypeMismatch {
165 referencing_column: referencing_column_name.to_owned(),
166 referencing_column_type: referencing_column_def.data_type.clone(),
167 referenced_column: referenced_column_name.to_owned(),
168 referenced_column_type: referenced_column_def.data_type.clone(),
169 }
170 .into());
171 }
172
173 if referenced_column_def.unique != Some(ColumnUniqueOption { is_primary: true }) {
174 return Err(AlterError::ReferencingNonPKColumn {
175 referenced_table: referenced_table_name.to_owned(),
176 referenced_column: referenced_column_name.to_owned(),
177 }
178 .into());
179 }
180 }
181
182 if storage.fetch_schema(target_table_name).await?.is_none() {
183 let schema = Schema {
184 table_name: target_table_name.to_owned(),
185 column_defs: target_columns_defs,
186 indexes: vec![],
187 engine: engine.clone(),
188 foreign_keys: foreign_keys.clone(),
189 comment: comment.clone(),
190 };
191
192 storage.insert_schema(&schema).await?;
193 } else if !if_not_exists {
194 return Err(AlterError::TableAlreadyExists(target_table_name.to_owned()).into());
195 }
196
197 match source {
198 Some(query) => {
199 let rows = select(storage, query, None)
200 .await?
201 .map_ok(Into::into)
202 .try_collect()
203 .await?;
204
205 storage.append_data(target_table_name, rows).await
206 }
207 None => Ok(()),
208 }
209}
210
211pub async fn drop_table<T: GStore + GStoreMut>(
212 storage: &mut T,
213 table_names: &[String],
214 if_exists: bool,
215 cascade: bool,
216) -> Result<usize> {
217 let mut n = 0;
218
219 for table_name in table_names {
220 let schema = storage.fetch_schema(table_name).await?;
221
222 match (schema, if_exists) {
223 (None, true) => {
224 continue;
225 }
226 (None, false) => {
227 return Err(AlterError::TableNotFound(table_name.to_owned()).into());
228 }
229 _ => {}
230 }
231
232 let referencings = storage.fetch_referencings(table_name).await?;
233
234 if !referencings.is_empty() && !cascade {
235 return Err(AlterError::CannotDropTableWithReferencing {
236 referenced_table_name: table_name.into(),
237 referencings,
238 }
239 .into());
240 }
241
242 for Referencing {
243 table_name,
244 foreign_key: ForeignKey { name, .. },
245 } in referencings
246 {
247 let mut schema = storage
248 .fetch_schema(&table_name)
249 .await?
250 .ok_or_else(|| AlterError::TableNotFound(table_name.clone()))?;
251 schema
252 .foreign_keys
253 .retain(|foreign_key| foreign_key.name != name);
254 storage.insert_schema(&schema).await?;
255 }
256 storage.delete_schema(table_name).await?;
257
258 n += 1;
259 }
260
261 Ok(n)
262}
263
264#[derive(Debug, PartialEq, Eq, Serialize)]
265pub struct Referencing {
266 pub table_name: String,
267 pub foreign_key: ForeignKey,
268}
269
270impl fmt::Display for Referencing {
271 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
272 write!(
273 f,
274 r#"{} on table "{}""#,
275 self.foreign_key.to_sql(),
276 self.table_name
277 )
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use {super::*, crate::ast::ReferentialAction};
284
285 #[test]
286 fn test_referencing_display() {
287 let referencing = Referencing {
288 table_name: "Referencing".to_owned(),
289 foreign_key: ForeignKey {
290 name: "FK_referenced_id-Referenced_id".to_owned(),
291 referencing_column_name: "referenced_id".to_owned(),
292 referenced_table_name: "Referenced".to_owned(),
293 referenced_column_name: "id".to_owned(),
294 on_delete: ReferentialAction::NoAction,
295 on_update: ReferentialAction::NoAction,
296 },
297 };
298
299 assert_eq!(
300 format!("{referencing}"),
301 r#"CONSTRAINT "FK_referenced_id-Referenced_id" FOREIGN KEY ("referenced_id") REFERENCES "Referenced" ("id") ON DELETE NO ACTION ON UPDATE NO ACTION on table "Referencing""#
302 );
303 }
304}