gluesql_core/executor/alter/
table.rs

1use {
2    super::{AlterError, validate, validate_column_names},
3    crate::{
4        ast::{
5            ColumnDef, ColumnUniqueOption, ForeignKey, Query, SetExpr, TableFactor, ToSql, Values,
6        },
7        data::Schema,
8        executor::{evaluate_stateless, select::select},
9        prelude::{DataType, Value},
10        result::Result,
11        store::{GStore, GStoreMut},
12    },
13    futures::stream::TryStreamExt,
14    serde::Serialize,
15    std::fmt,
16};
17
18pub struct CreateTableOptions<'a> {
19    pub target_table_name: &'a str,
20    pub column_defs: Option<&'a [ColumnDef]>,
21    pub if_not_exists: bool,
22    pub source: &'a Option<Box<Query>>,
23    pub engine: &'a Option<String>,
24    pub foreign_keys: &'a Vec<ForeignKey>,
25    pub comment: &'a Option<String>,
26}
27
28pub async fn create_table<T: GStore + GStoreMut>(
29    storage: &mut T,
30    CreateTableOptions {
31        target_table_name,
32        column_defs,
33        if_not_exists,
34        source,
35        engine,
36        foreign_keys,
37        comment,
38    }: CreateTableOptions<'_>,
39) -> Result<()> {
40    let target_columns_defs = match source.as_deref() {
41        Some(Query { body, .. }) => match body {
42            SetExpr::Select(select_query) => match &select_query.from.relation {
43                TableFactor::Table { name, .. } => {
44                    let schema = storage.fetch_schema(name).await?;
45                    let Schema {
46                        column_defs: source_column_defs,
47                        ..
48                    } = schema
49                        .ok_or_else(|| AlterError::CtasSourceTableNotFound(name.to_owned()))?;
50
51                    source_column_defs
52                }
53                TableFactor::Series { .. } => {
54                    let column_def = ColumnDef {
55                        name: "N".into(),
56                        data_type: DataType::Int,
57                        nullable: false,
58                        default: None,
59                        unique: None,
60                        comment: None,
61                    };
62
63                    Some(vec![column_def])
64                }
65                _ => {
66                    return Err(AlterError::Unreachable.into());
67                }
68            },
69            SetExpr::Values(Values(values_list)) => {
70                let first_len = values_list[0].len();
71                let mut column_types = vec![None; first_len];
72
73                for exprs in values_list {
74                    for (i, expr) in exprs.iter().enumerate() {
75                        if column_types[i].is_some() {
76                            continue;
77                        }
78
79                        column_types[i] = evaluate_stateless(None, expr)
80                            .await
81                            .and_then(Value::try_from)
82                            .map(|value| value.get_type())?;
83                    }
84
85                    if column_types.iter().all(Option::is_some) {
86                        break;
87                    }
88                }
89
90                let column_defs = column_types
91                    .iter()
92                    .map(|column_type| match column_type {
93                        Some(column_type) => column_type.to_owned(),
94                        None => DataType::Text,
95                    })
96                    .enumerate()
97                    .map(|(i, data_type)| ColumnDef {
98                        name: format!("column{}", i + 1),
99                        data_type,
100                        nullable: true,
101                        default: None,
102                        unique: None,
103                        comment: None,
104                    })
105                    .collect::<Vec<_>>();
106
107                Some(column_defs)
108            }
109        },
110        None if column_defs.is_some() => column_defs.map(<[ColumnDef]>::to_vec),
111        None => None,
112    };
113
114    if let Some(column_defs) = target_columns_defs.as_deref() {
115        validate_column_names(column_defs)?;
116
117        for column_def in column_defs {
118            validate(column_def).await?;
119        }
120    }
121
122    for foreign_key in foreign_keys {
123        let ForeignKey {
124            referencing_column_name,
125            referenced_table_name,
126            referenced_column_name,
127            ..
128        } = foreign_key;
129
130        let column_defs = if referenced_table_name == target_table_name {
131            target_columns_defs.clone()
132        } else {
133            let referenced_schema = storage
134                .fetch_schema(referenced_table_name)
135                .await?
136                .ok_or_else(|| {
137                    AlterError::ReferencedTableNotFound(referenced_table_name.to_owned())
138                })?;
139
140            referenced_schema.column_defs
141        };
142
143        let referenced_column_def = column_defs
144            .and_then(|column_defs| {
145                column_defs
146                    .into_iter()
147                    .find(|column_def| column_def.name == *referenced_column_name)
148            })
149            .ok_or_else(|| AlterError::ReferencedColumnNotFound(referenced_column_name.to_owned()))?
150            .clone();
151
152        let referencing_column_def = target_columns_defs
153            .as_deref()
154            .and_then(|column_defs| {
155                column_defs
156                    .iter()
157                    .find(|column_def| column_def.name == *referencing_column_name)
158            })
159            .ok_or_else(|| {
160                AlterError::ReferencingColumnNotFound(referencing_column_name.to_owned())
161            })?;
162
163        if referencing_column_def.data_type != referenced_column_def.data_type {
164            return Err(AlterError::ForeignKeyDataTypeMismatch {
165                referencing_column: referencing_column_name.to_owned(),
166                referencing_column_type: referencing_column_def.data_type.clone(),
167                referenced_column: referenced_column_name.to_owned(),
168                referenced_column_type: referenced_column_def.data_type.clone(),
169            }
170            .into());
171        }
172
173        if referenced_column_def.unique != Some(ColumnUniqueOption { is_primary: true }) {
174            return Err(AlterError::ReferencingNonPKColumn {
175                referenced_table: referenced_table_name.to_owned(),
176                referenced_column: referenced_column_name.to_owned(),
177            }
178            .into());
179        }
180    }
181
182    if storage.fetch_schema(target_table_name).await?.is_none() {
183        let schema = Schema {
184            table_name: target_table_name.to_owned(),
185            column_defs: target_columns_defs,
186            indexes: vec![],
187            engine: engine.clone(),
188            foreign_keys: foreign_keys.clone(),
189            comment: comment.clone(),
190        };
191
192        storage.insert_schema(&schema).await?;
193    } else if !if_not_exists {
194        return Err(AlterError::TableAlreadyExists(target_table_name.to_owned()).into());
195    }
196
197    match source {
198        Some(query) => {
199            let rows = select(storage, query, None)
200                .await?
201                .map_ok(Into::into)
202                .try_collect()
203                .await?;
204
205            storage.append_data(target_table_name, rows).await
206        }
207        None => Ok(()),
208    }
209}
210
211pub async fn drop_table<T: GStore + GStoreMut>(
212    storage: &mut T,
213    table_names: &[String],
214    if_exists: bool,
215    cascade: bool,
216) -> Result<usize> {
217    let mut n = 0;
218
219    for table_name in table_names {
220        let schema = storage.fetch_schema(table_name).await?;
221
222        match (schema, if_exists) {
223            (None, true) => {
224                continue;
225            }
226            (None, false) => {
227                return Err(AlterError::TableNotFound(table_name.to_owned()).into());
228            }
229            _ => {}
230        }
231
232        let referencings = storage.fetch_referencings(table_name).await?;
233
234        if !referencings.is_empty() && !cascade {
235            return Err(AlterError::CannotDropTableWithReferencing {
236                referenced_table_name: table_name.into(),
237                referencings,
238            }
239            .into());
240        }
241
242        for Referencing {
243            table_name,
244            foreign_key: ForeignKey { name, .. },
245        } in referencings
246        {
247            let mut schema = storage
248                .fetch_schema(&table_name)
249                .await?
250                .ok_or_else(|| AlterError::TableNotFound(table_name.clone()))?;
251            schema
252                .foreign_keys
253                .retain(|foreign_key| foreign_key.name != name);
254            storage.insert_schema(&schema).await?;
255        }
256        storage.delete_schema(table_name).await?;
257
258        n += 1;
259    }
260
261    Ok(n)
262}
263
264#[derive(Debug, PartialEq, Eq, Serialize)]
265pub struct Referencing {
266    pub table_name: String,
267    pub foreign_key: ForeignKey,
268}
269
270impl fmt::Display for Referencing {
271    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
272        write!(
273            f,
274            r#"{} on table "{}""#,
275            self.foreign_key.to_sql(),
276            self.table_name
277        )
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use {super::*, crate::ast::ReferentialAction};
284
285    #[test]
286    fn test_referencing_display() {
287        let referencing = Referencing {
288            table_name: "Referencing".to_owned(),
289            foreign_key: ForeignKey {
290                name: "FK_referenced_id-Referenced_id".to_owned(),
291                referencing_column_name: "referenced_id".to_owned(),
292                referenced_table_name: "Referenced".to_owned(),
293                referenced_column_name: "id".to_owned(),
294                on_delete: ReferentialAction::NoAction,
295                on_update: ReferentialAction::NoAction,
296            },
297        };
298
299        assert_eq!(
300            format!("{referencing}"),
301            r#"CONSTRAINT "FK_referenced_id-Referenced_id" FOREIGN KEY ("referenced_id") REFERENCES "Referenced" ("id") ON DELETE NO ACTION ON UPDATE NO ACTION on table "Referencing""#
302        );
303    }
304}