gluesql_core/executor/alter/
table.rs

1use {
2    super::{AlterError, validate, validate_column_names},
3    crate::{
4        ast::{
5            ColumnDef, ColumnUniqueOption, ForeignKey, Query, SetExpr, TableFactor, ToSql, Values,
6        },
7        data::Schema,
8        executor::{evaluate_stateless, select::select},
9        prelude::{DataType, Value},
10        result::Result,
11        store::{GStore, GStoreMut},
12    },
13    futures::stream::TryStreamExt,
14    serde::Serialize,
15    std::fmt,
16};
17
18pub struct CreateTableOptions<'a> {
19    pub target_table_name: &'a str,
20    pub column_defs: Option<&'a [ColumnDef]>,
21    pub if_not_exists: bool,
22    pub source: &'a Option<Box<Query>>,
23    pub engine: &'a Option<String>,
24    pub foreign_keys: &'a Vec<ForeignKey>,
25    pub comment: &'a Option<String>,
26}
27
28pub async fn create_table<T: GStore + GStoreMut>(
29    storage: &mut T,
30    CreateTableOptions {
31        target_table_name,
32        column_defs,
33        if_not_exists,
34        source,
35        engine,
36        foreign_keys,
37        comment,
38    }: CreateTableOptions<'_>,
39) -> Result<()> {
40    let target_columns_defs = match source.as_deref() {
41        Some(Query { body, .. }) => match body {
42            SetExpr::Select(select_query) => match &select_query.from.relation {
43                TableFactor::Table { name, .. } => {
44                    let schema = storage.fetch_schema(name).await?;
45                    let Schema {
46                        column_defs: source_column_defs,
47                        ..
48                    } = schema
49                        .ok_or_else(|| AlterError::CtasSourceTableNotFound(name.to_owned()))?;
50
51                    source_column_defs
52                }
53                TableFactor::Series { .. } => {
54                    let column_def = ColumnDef {
55                        name: "N".into(),
56                        data_type: DataType::Int,
57                        nullable: false,
58                        default: None,
59                        unique: None,
60                        comment: None,
61                    };
62
63                    Some(vec![column_def])
64                }
65                _ => {
66                    return Err(AlterError::Unreachable.into());
67                }
68            },
69            SetExpr::Values(Values(values_list)) => {
70                let first_len = values_list[0].len();
71                let mut column_types = vec![None; first_len];
72
73                for exprs in values_list {
74                    for (i, expr) in exprs.iter().enumerate() {
75                        if column_types[i].is_some() {
76                            continue;
77                        }
78
79                        column_types[i] = evaluate_stateless(None, expr)
80                            .await
81                            .and_then(Value::try_from)
82                            .map(|value| value.get_type())?;
83                    }
84
85                    if column_types.iter().all(Option::is_some) {
86                        break;
87                    }
88                }
89
90                let column_defs = column_types
91                    .iter()
92                    .map(|column_type| match column_type {
93                        Some(column_type) => column_type.to_owned(),
94                        None => DataType::Text,
95                    })
96                    .enumerate()
97                    .map(|(i, data_type)| ColumnDef {
98                        name: format!("column{}", i + 1),
99                        data_type,
100                        nullable: true,
101                        default: None,
102                        unique: None,
103                        comment: None,
104                    })
105                    .collect::<Vec<_>>();
106
107                Some(column_defs)
108            }
109        },
110        None if column_defs.is_some() => column_defs.map(<[ColumnDef]>::to_vec),
111        None => None,
112    };
113
114    if let Some(column_defs) = target_columns_defs.as_deref() {
115        validate_column_names(column_defs)?;
116
117        for column_def in column_defs {
118            validate(column_def).await?;
119        }
120    }
121
122    for foreign_key in foreign_keys {
123        let ForeignKey {
124            referencing_column_name,
125            referenced_table_name,
126            referenced_column_name,
127            ..
128        } = foreign_key;
129
130        let column_defs = if referenced_table_name == target_table_name {
131            target_columns_defs.clone()
132        } else {
133            let referenced_schema = storage
134                .fetch_schema(referenced_table_name)
135                .await?
136                .ok_or_else(|| {
137                    AlterError::ReferencedTableNotFound(referenced_table_name.to_owned())
138                })?;
139
140            referenced_schema.column_defs
141        };
142
143        let referenced_column_def = column_defs
144            .and_then(|column_defs| {
145                column_defs
146                    .into_iter()
147                    .find(|column_def| column_def.name == *referenced_column_name)
148            })
149            .ok_or_else(|| AlterError::ReferencedColumnNotFound(referenced_column_name.to_owned()))?
150            .to_owned();
151
152        let referencing_column_def = target_columns_defs
153            .as_deref()
154            .and_then(|column_defs| {
155                column_defs
156                    .iter()
157                    .find(|column_def| column_def.name == *referencing_column_name)
158            })
159            .ok_or_else(|| {
160                AlterError::ReferencingColumnNotFound(referencing_column_name.to_owned())
161            })?;
162
163        if referencing_column_def.data_type != referenced_column_def.data_type {
164            return Err(AlterError::ForeignKeyDataTypeMismatch {
165                referencing_column: referencing_column_name.to_owned(),
166                referencing_column_type: referencing_column_def.data_type.to_owned(),
167                referenced_column: referenced_column_name.to_owned(),
168                referenced_column_type: referenced_column_def.data_type.to_owned(),
169            }
170            .into());
171        }
172
173        if referenced_column_def.unique != Some(ColumnUniqueOption { is_primary: true }) {
174            return Err(AlterError::ReferencingNonPKColumn {
175                referenced_table: referenced_table_name.to_owned(),
176                referenced_column: referenced_column_name.to_owned(),
177            }
178            .into());
179        }
180    }
181
182    if storage.fetch_schema(target_table_name).await?.is_none() {
183        let schema = Schema {
184            table_name: target_table_name.to_owned(),
185            column_defs: target_columns_defs,
186            indexes: vec![],
187            engine: engine.clone(),
188            foreign_keys: foreign_keys.clone(),
189            comment: comment.clone(),
190        };
191
192        storage.insert_schema(&schema).await?;
193    } else if !if_not_exists {
194        return Err(AlterError::TableAlreadyExists(target_table_name.to_owned()).into());
195    }
196
197    match source {
198        Some(query) => {
199            let rows = select(storage, query, None)
200                .await?
201                .map_ok(Into::into)
202                .try_collect()
203                .await?;
204
205            storage
206                .append_data(target_table_name, rows)
207                .await
208                .map(|_| ())
209        }
210        None => Ok(()),
211    }
212}
213
214pub async fn drop_table<T: GStore + GStoreMut>(
215    storage: &mut T,
216    table_names: &[String],
217    if_exists: bool,
218    cascade: bool,
219) -> Result<usize> {
220    let mut n = 0;
221
222    for table_name in table_names {
223        let schema = storage.fetch_schema(table_name).await?;
224
225        match (schema, if_exists) {
226            (None, true) => {
227                continue;
228            }
229            (None, false) => {
230                return Err(AlterError::TableNotFound(table_name.to_owned()).into());
231            }
232            _ => {}
233        }
234
235        let referencings = storage.fetch_referencings(table_name).await?;
236
237        if !referencings.is_empty() && !cascade {
238            return Err(AlterError::CannotDropTableWithReferencing {
239                referenced_table_name: table_name.into(),
240                referencings,
241            }
242            .into());
243        }
244
245        for Referencing {
246            table_name,
247            foreign_key: ForeignKey { name, .. },
248        } in referencings
249        {
250            let mut schema = storage
251                .fetch_schema(&table_name)
252                .await?
253                .ok_or_else(|| AlterError::TableNotFound(table_name.to_owned()))?;
254            schema
255                .foreign_keys
256                .retain(|foreign_key| foreign_key.name != name);
257            storage.insert_schema(&schema).await?;
258        }
259        storage.delete_schema(table_name).await?;
260
261        n += 1;
262    }
263
264    Ok(n)
265}
266
267#[derive(Debug, PartialEq, Eq, Serialize)]
268pub struct Referencing {
269    pub table_name: String,
270    pub foreign_key: ForeignKey,
271}
272
273impl fmt::Display for Referencing {
274    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
275        write!(
276            f,
277            r#"{} on table "{}""#,
278            self.foreign_key.to_sql(),
279            self.table_name
280        )
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use {super::*, crate::ast::ReferentialAction};
287
288    #[test]
289    fn test_referencing_display() {
290        let referencing = Referencing {
291            table_name: "Referencing".to_owned(),
292            foreign_key: ForeignKey {
293                name: "FK_referenced_id-Referenced_id".to_owned(),
294                referencing_column_name: "referenced_id".to_owned(),
295                referenced_table_name: "Referenced".to_owned(),
296                referenced_column_name: "id".to_owned(),
297                on_delete: ReferentialAction::NoAction,
298                on_update: ReferentialAction::NoAction,
299            },
300        };
301
302        assert_eq!(
303            format!("{referencing}"),
304            r#"CONSTRAINT "FK_referenced_id-Referenced_id" FOREIGN KEY ("referenced_id") REFERENCES "Referenced" ("id") ON DELETE NO ACTION ON UPDATE NO ACTION on table "Referencing""#
305        );
306    }
307}