gluesql_core/store/
alter_table.rs

1use {
2    super::{DataRow, Store, StoreMut},
3    crate::{ast::ColumnDef, data::Value, executor::evaluate_stateless, result::Result},
4    async_trait::async_trait,
5    futures::TryStreamExt,
6    serde::Serialize,
7    std::fmt::Debug,
8    thiserror::Error,
9};
10
11#[derive(Error, Serialize, Debug, PartialEq, Eq)]
12pub enum AlterTableError {
13    #[error("Table not found: {0}")]
14    TableNotFound(String),
15
16    #[error("Renaming column not found")]
17    RenamingColumnNotFound,
18
19    #[error("Default value is required: {0:#?}")]
20    DefaultValueRequired(ColumnDef),
21
22    #[error("Already existing column: {0}")]
23    AlreadyExistingColumn(String),
24
25    #[error("Dropping column not found: {0}")]
26    DroppingColumnNotFound(String),
27
28    #[error("Schemaless table does not support ALTER TABLE: {0}")]
29    SchemalessTableFound(String),
30
31    #[error("conflict - Vec expected but Map row found")]
32    ConflictOnUnexpectedMapRowFound,
33}
34
35#[async_trait]
36pub trait AlterTable: Store + StoreMut {
37    async fn rename_schema(&mut self, table_name: &str, new_table_name: &str) -> Result<()> {
38        let mut schema = self
39            .fetch_schema(table_name)
40            .await?
41            .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
42        new_table_name.clone_into(&mut schema.table_name);
43        self.insert_schema(&schema).await?;
44
45        let rows = self
46            .scan_data(table_name)
47            .await?
48            .try_collect::<Vec<_>>()
49            .await?;
50
51        self.insert_data(new_table_name, rows).await?;
52        self.delete_schema(table_name).await
53    }
54
55    async fn rename_column(
56        &mut self,
57        table_name: &str,
58        old_column_name: &str,
59        new_column_name: &str,
60    ) -> Result<()> {
61        let mut schema = self
62            .fetch_schema(table_name)
63            .await?
64            .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
65
66        let column_defs = schema
67            .column_defs
68            .as_mut()
69            .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
70
71        if column_defs
72            .iter()
73            .any(|column_def| column_def.name == new_column_name)
74        {
75            return Err(AlterTableError::AlreadyExistingColumn(new_column_name.to_owned()).into());
76        }
77
78        new_column_name.clone_into(
79            &mut column_defs
80                .iter_mut()
81                .find(|column_def| column_def.name == old_column_name)
82                .ok_or(AlterTableError::RenamingColumnNotFound)?
83                .name,
84        );
85
86        let rows = self
87            .scan_data(table_name)
88            .await?
89            .try_collect::<Vec<_>>()
90            .await?;
91
92        self.insert_schema(&schema).await?;
93        self.insert_data(table_name, rows).await
94    }
95
96    async fn add_column(&mut self, table_name: &str, column_def: &ColumnDef) -> Result<()> {
97        let mut schema = self
98            .fetch_schema(table_name)
99            .await?
100            .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
101
102        let default_value = match (column_def.default.as_ref(), column_def.nullable) {
103            (Some(default), _) => evaluate_stateless(None, default).await?.try_into()?,
104            (None, true) => Value::Null,
105            (None, false) => {
106                return Err(AlterTableError::DefaultValueRequired(column_def.clone()).into());
107            }
108        };
109
110        let column_defs = schema
111            .column_defs
112            .as_mut()
113            .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
114
115        if column_defs.iter().any(|def| def.name == column_def.name) {
116            return Err(AlterTableError::AlreadyExistingColumn(column_def.name.clone()).into());
117        }
118
119        column_defs.push(column_def.clone());
120
121        let rows = self
122            .scan_data(table_name)
123            .await?
124            .and_then(|(key, mut data_row)| {
125                let default_value = default_value.clone();
126
127                async move {
128                    match &mut data_row {
129                        DataRow::Map(_) => {
130                            Err(AlterTableError::ConflictOnUnexpectedMapRowFound.into())
131                        }
132                        DataRow::Vec(rows) => {
133                            rows.push(default_value);
134
135                            Ok((key, data_row))
136                        }
137                    }
138                }
139            })
140            .try_collect::<Vec<_>>()
141            .await?;
142
143        self.insert_schema(&schema).await?;
144        self.insert_data(table_name, rows).await
145    }
146
147    async fn drop_column(
148        &mut self,
149        table_name: &str,
150        column_name: &str,
151        if_exists: bool,
152    ) -> Result<()> {
153        let mut schema = self
154            .fetch_schema(table_name)
155            .await?
156            .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
157
158        let column_defs = schema
159            .column_defs
160            .as_mut()
161            .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
162
163        let i = match column_defs
164            .iter()
165            .position(|column_def| column_def.name == column_name)
166        {
167            Some(i) => i,
168            None if if_exists => return Ok(()),
169            None => {
170                return Err(AlterTableError::DroppingColumnNotFound(column_name.to_owned()).into());
171            }
172        };
173
174        column_defs.retain(|column_def| column_def.name != column_name);
175
176        let rows = self
177            .scan_data(table_name)
178            .await?
179            .and_then(|(key, mut data_row)| async move {
180                match &mut data_row {
181                    DataRow::Map(_) => Err(AlterTableError::ConflictOnUnexpectedMapRowFound.into()),
182                    DataRow::Vec(rows) => {
183                        rows.remove(i);
184
185                        Ok((key, data_row))
186                    }
187                }
188            })
189            .try_collect::<Vec<_>>()
190            .await?;
191
192        self.insert_schema(&schema).await?;
193        self.insert_data(table_name, rows).await
194    }
195}