gluesql_core/store/
alter_table.rs

1use {
2    super::{DataRow, Store, StoreMut},
3    crate::{ast::ColumnDef, data::Value, executor::evaluate_stateless, result::Result},
4    async_trait::async_trait,
5    futures::TryStreamExt,
6    serde::Serialize,
7    std::fmt::Debug,
8    thiserror::Error,
9};
10
11#[derive(Error, Serialize, Debug, PartialEq, Eq)]
12pub enum AlterTableError {
13    #[error("Table not found: {0}")]
14    TableNotFound(String),
15
16    #[error("Renaming column not found")]
17    RenamingColumnNotFound,
18
19    #[error("Default value is required: {0:#?}")]
20    DefaultValueRequired(ColumnDef),
21
22    #[error("Already existing column: {0}")]
23    AlreadyExistingColumn(String),
24
25    #[error("Dropping column not found: {0}")]
26    DroppingColumnNotFound(String),
27
28    #[error("Schemaless table does not support ALTER TABLE: {0}")]
29    SchemalessTableFound(String),
30
31    #[error("conflict - Vec expected but Map row found")]
32    ConflictOnUnexpectedMapRowFound,
33}
34
35#[async_trait(?Send)]
36pub trait AlterTable: Store + StoreMut {
37    async fn rename_schema(&mut self, table_name: &str, new_table_name: &str) -> Result<()> {
38        let mut schema = self
39            .fetch_schema(table_name)
40            .await?
41            .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
42        schema.table_name = new_table_name.to_owned();
43        self.insert_schema(&schema).await?;
44
45        let rows = self
46            .scan_data(table_name)
47            .await?
48            .try_collect::<Vec<_>>()
49            .await?;
50
51        self.insert_data(new_table_name, rows).await?;
52        self.delete_schema(table_name).await
53    }
54
55    async fn rename_column(
56        &mut self,
57        table_name: &str,
58        old_column_name: &str,
59        new_column_name: &str,
60    ) -> Result<()> {
61        let mut schema = self
62            .fetch_schema(table_name)
63            .await?
64            .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
65
66        let column_defs = schema
67            .column_defs
68            .as_mut()
69            .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
70
71        if column_defs
72            .iter()
73            .any(|column_def| column_def.name == new_column_name)
74        {
75            return Err(AlterTableError::AlreadyExistingColumn(new_column_name.to_owned()).into());
76        }
77
78        column_defs
79            .iter_mut()
80            .find(|column_def| column_def.name == old_column_name)
81            .ok_or_else(|| AlterTableError::RenamingColumnNotFound)?
82            .name = new_column_name.to_owned();
83
84        let rows = self
85            .scan_data(table_name)
86            .await?
87            .try_collect::<Vec<_>>()
88            .await?;
89
90        self.insert_schema(&schema).await?;
91        self.insert_data(table_name, rows).await
92    }
93
94    async fn add_column(&mut self, table_name: &str, column_def: &ColumnDef) -> Result<()> {
95        let mut schema = self
96            .fetch_schema(table_name)
97            .await?
98            .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
99
100        let default_value = match (column_def.default.as_ref(), column_def.nullable) {
101            (Some(default), _) => evaluate_stateless(None, default).await?.try_into()?,
102            (None, true) => Value::Null,
103            (None, false) => {
104                return Err(AlterTableError::DefaultValueRequired(column_def.clone()).into())
105            }
106        };
107
108        let column_defs = schema
109            .column_defs
110            .as_mut()
111            .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
112
113        if column_defs.iter().any(|def| def.name == column_def.name) {
114            return Err(AlterTableError::AlreadyExistingColumn(column_def.name.clone()).into());
115        }
116
117        column_defs.push(column_def.clone());
118
119        let rows = self
120            .scan_data(table_name)
121            .await?
122            .and_then(|(key, mut data_row)| {
123                let default_value = default_value.clone();
124
125                async move {
126                    match &mut data_row {
127                        DataRow::Map(_) => {
128                            Err(AlterTableError::ConflictOnUnexpectedMapRowFound.into())
129                        }
130                        DataRow::Vec(ref mut rows) => {
131                            rows.push(default_value);
132
133                            Ok((key, data_row))
134                        }
135                    }
136                }
137            })
138            .try_collect::<Vec<_>>()
139            .await?;
140
141        self.insert_schema(&schema).await?;
142        self.insert_data(table_name, rows).await
143    }
144
145    async fn drop_column(
146        &mut self,
147        table_name: &str,
148        column_name: &str,
149        if_exists: bool,
150    ) -> Result<()> {
151        let mut schema = self
152            .fetch_schema(table_name)
153            .await?
154            .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
155
156        let column_defs = schema
157            .column_defs
158            .as_mut()
159            .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
160
161        let i = match column_defs
162            .iter()
163            .position(|column_def| column_def.name == column_name)
164        {
165            Some(i) => i,
166            None if if_exists => return Ok(()),
167            None => {
168                return Err(AlterTableError::DroppingColumnNotFound(column_name.to_owned()).into())
169            }
170        };
171
172        column_defs.retain(|column_def| column_def.name != column_name);
173
174        let rows = self
175            .scan_data(table_name)
176            .await?
177            .and_then(|(key, mut data_row)| async move {
178                match &mut data_row {
179                    DataRow::Map(_) => Err(AlterTableError::ConflictOnUnexpectedMapRowFound.into()),
180                    DataRow::Vec(ref mut rows) => {
181                        rows.remove(i);
182
183                        Ok((key, data_row))
184                    }
185                }
186            })
187            .try_collect::<Vec<_>>()
188            .await?;
189
190        self.insert_schema(&schema).await?;
191        self.insert_data(table_name, rows).await
192    }
193}