gluesql_core/store/
alter_table.rs1use {
2 super::{DataRow, Store, StoreMut},
3 crate::{ast::ColumnDef, data::Value, executor::evaluate_stateless, result::Result},
4 async_trait::async_trait,
5 futures::TryStreamExt,
6 serde::Serialize,
7 std::fmt::Debug,
8 thiserror::Error,
9};
10
11#[derive(Error, Serialize, Debug, PartialEq, Eq)]
12pub enum AlterTableError {
13 #[error("Table not found: {0}")]
14 TableNotFound(String),
15
16 #[error("Renaming column not found")]
17 RenamingColumnNotFound,
18
19 #[error("Default value is required: {0:#?}")]
20 DefaultValueRequired(ColumnDef),
21
22 #[error("Already existing column: {0}")]
23 AlreadyExistingColumn(String),
24
25 #[error("Dropping column not found: {0}")]
26 DroppingColumnNotFound(String),
27
28 #[error("Schemaless table does not support ALTER TABLE: {0}")]
29 SchemalessTableFound(String),
30
31 #[error("conflict - Vec expected but Map row found")]
32 ConflictOnUnexpectedMapRowFound,
33}
34
35#[async_trait]
36pub trait AlterTable: Store + StoreMut {
37 async fn rename_schema(&mut self, table_name: &str, new_table_name: &str) -> Result<()> {
38 let mut schema = self
39 .fetch_schema(table_name)
40 .await?
41 .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
42 new_table_name.clone_into(&mut schema.table_name);
43 self.insert_schema(&schema).await?;
44
45 let rows = self
46 .scan_data(table_name)
47 .await?
48 .try_collect::<Vec<_>>()
49 .await?;
50
51 self.insert_data(new_table_name, rows).await?;
52 self.delete_schema(table_name).await
53 }
54
55 async fn rename_column(
56 &mut self,
57 table_name: &str,
58 old_column_name: &str,
59 new_column_name: &str,
60 ) -> Result<()> {
61 let mut schema = self
62 .fetch_schema(table_name)
63 .await?
64 .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
65
66 let column_defs = schema
67 .column_defs
68 .as_mut()
69 .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
70
71 if column_defs
72 .iter()
73 .any(|column_def| column_def.name == new_column_name)
74 {
75 return Err(AlterTableError::AlreadyExistingColumn(new_column_name.to_owned()).into());
76 }
77
78 new_column_name.clone_into(
79 &mut column_defs
80 .iter_mut()
81 .find(|column_def| column_def.name == old_column_name)
82 .ok_or(AlterTableError::RenamingColumnNotFound)?
83 .name,
84 );
85
86 let rows = self
87 .scan_data(table_name)
88 .await?
89 .try_collect::<Vec<_>>()
90 .await?;
91
92 self.insert_schema(&schema).await?;
93 self.insert_data(table_name, rows).await
94 }
95
96 async fn add_column(&mut self, table_name: &str, column_def: &ColumnDef) -> Result<()> {
97 let mut schema = self
98 .fetch_schema(table_name)
99 .await?
100 .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
101
102 let default_value = match (column_def.default.as_ref(), column_def.nullable) {
103 (Some(default), _) => evaluate_stateless(None, default).await?.try_into()?,
104 (None, true) => Value::Null,
105 (None, false) => {
106 return Err(AlterTableError::DefaultValueRequired(column_def.clone()).into());
107 }
108 };
109
110 let column_defs = schema
111 .column_defs
112 .as_mut()
113 .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
114
115 if column_defs.iter().any(|def| def.name == column_def.name) {
116 return Err(AlterTableError::AlreadyExistingColumn(column_def.name.clone()).into());
117 }
118
119 column_defs.push(column_def.clone());
120
121 let rows = self
122 .scan_data(table_name)
123 .await?
124 .and_then(|(key, mut data_row)| {
125 let default_value = default_value.clone();
126
127 async move {
128 match &mut data_row {
129 DataRow::Map(_) => {
130 Err(AlterTableError::ConflictOnUnexpectedMapRowFound.into())
131 }
132 DataRow::Vec(rows) => {
133 rows.push(default_value);
134
135 Ok((key, data_row))
136 }
137 }
138 }
139 })
140 .try_collect::<Vec<_>>()
141 .await?;
142
143 self.insert_schema(&schema).await?;
144 self.insert_data(table_name, rows).await
145 }
146
147 async fn drop_column(
148 &mut self,
149 table_name: &str,
150 column_name: &str,
151 if_exists: bool,
152 ) -> Result<()> {
153 let mut schema = self
154 .fetch_schema(table_name)
155 .await?
156 .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
157
158 let column_defs = schema
159 .column_defs
160 .as_mut()
161 .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
162
163 let i = match column_defs
164 .iter()
165 .position(|column_def| column_def.name == column_name)
166 {
167 Some(i) => i,
168 None if if_exists => return Ok(()),
169 None => {
170 return Err(AlterTableError::DroppingColumnNotFound(column_name.to_owned()).into());
171 }
172 };
173
174 column_defs.retain(|column_def| column_def.name != column_name);
175
176 let rows = self
177 .scan_data(table_name)
178 .await?
179 .and_then(|(key, mut data_row)| async move {
180 match &mut data_row {
181 DataRow::Map(_) => Err(AlterTableError::ConflictOnUnexpectedMapRowFound.into()),
182 DataRow::Vec(rows) => {
183 rows.remove(i);
184
185 Ok((key, data_row))
186 }
187 }
188 })
189 .try_collect::<Vec<_>>()
190 .await?;
191
192 self.insert_schema(&schema).await?;
193 self.insert_data(table_name, rows).await
194 }
195}