gluesql_core/store/
alter_table.rs1use {
2 super::{DataRow, Store, StoreMut},
3 crate::{ast::ColumnDef, data::Value, executor::evaluate_stateless, result::Result},
4 async_trait::async_trait,
5 futures::TryStreamExt,
6 serde::Serialize,
7 std::fmt::Debug,
8 thiserror::Error,
9};
10
11#[derive(Error, Serialize, Debug, PartialEq, Eq)]
12pub enum AlterTableError {
13 #[error("Table not found: {0}")]
14 TableNotFound(String),
15
16 #[error("Renaming column not found")]
17 RenamingColumnNotFound,
18
19 #[error("Default value is required: {0:#?}")]
20 DefaultValueRequired(ColumnDef),
21
22 #[error("Already existing column: {0}")]
23 AlreadyExistingColumn(String),
24
25 #[error("Dropping column not found: {0}")]
26 DroppingColumnNotFound(String),
27
28 #[error("Schemaless table does not support ALTER TABLE: {0}")]
29 SchemalessTableFound(String),
30
31 #[error("conflict - Vec expected but Map row found")]
32 ConflictOnUnexpectedMapRowFound,
33}
34
35#[async_trait(?Send)]
36pub trait AlterTable: Store + StoreMut {
37 async fn rename_schema(&mut self, table_name: &str, new_table_name: &str) -> Result<()> {
38 let mut schema = self
39 .fetch_schema(table_name)
40 .await?
41 .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
42 schema.table_name = new_table_name.to_owned();
43 self.insert_schema(&schema).await?;
44
45 let rows = self
46 .scan_data(table_name)
47 .await?
48 .try_collect::<Vec<_>>()
49 .await?;
50
51 self.insert_data(new_table_name, rows).await?;
52 self.delete_schema(table_name).await
53 }
54
55 async fn rename_column(
56 &mut self,
57 table_name: &str,
58 old_column_name: &str,
59 new_column_name: &str,
60 ) -> Result<()> {
61 let mut schema = self
62 .fetch_schema(table_name)
63 .await?
64 .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
65
66 let column_defs = schema
67 .column_defs
68 .as_mut()
69 .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
70
71 if column_defs
72 .iter()
73 .any(|column_def| column_def.name == new_column_name)
74 {
75 return Err(AlterTableError::AlreadyExistingColumn(new_column_name.to_owned()).into());
76 }
77
78 column_defs
79 .iter_mut()
80 .find(|column_def| column_def.name == old_column_name)
81 .ok_or_else(|| AlterTableError::RenamingColumnNotFound)?
82 .name = new_column_name.to_owned();
83
84 let rows = self
85 .scan_data(table_name)
86 .await?
87 .try_collect::<Vec<_>>()
88 .await?;
89
90 self.insert_schema(&schema).await?;
91 self.insert_data(table_name, rows).await
92 }
93
94 async fn add_column(&mut self, table_name: &str, column_def: &ColumnDef) -> Result<()> {
95 let mut schema = self
96 .fetch_schema(table_name)
97 .await?
98 .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
99
100 let default_value = match (column_def.default.as_ref(), column_def.nullable) {
101 (Some(default), _) => evaluate_stateless(None, default).await?.try_into()?,
102 (None, true) => Value::Null,
103 (None, false) => {
104 return Err(AlterTableError::DefaultValueRequired(column_def.clone()).into())
105 }
106 };
107
108 let column_defs = schema
109 .column_defs
110 .as_mut()
111 .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
112
113 if column_defs.iter().any(|def| def.name == column_def.name) {
114 return Err(AlterTableError::AlreadyExistingColumn(column_def.name.clone()).into());
115 }
116
117 column_defs.push(column_def.clone());
118
119 let rows = self
120 .scan_data(table_name)
121 .await?
122 .and_then(|(key, mut data_row)| {
123 let default_value = default_value.clone();
124
125 async move {
126 match &mut data_row {
127 DataRow::Map(_) => {
128 Err(AlterTableError::ConflictOnUnexpectedMapRowFound.into())
129 }
130 DataRow::Vec(ref mut rows) => {
131 rows.push(default_value);
132
133 Ok((key, data_row))
134 }
135 }
136 }
137 })
138 .try_collect::<Vec<_>>()
139 .await?;
140
141 self.insert_schema(&schema).await?;
142 self.insert_data(table_name, rows).await
143 }
144
145 async fn drop_column(
146 &mut self,
147 table_name: &str,
148 column_name: &str,
149 if_exists: bool,
150 ) -> Result<()> {
151 let mut schema = self
152 .fetch_schema(table_name)
153 .await?
154 .ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;
155
156 let column_defs = schema
157 .column_defs
158 .as_mut()
159 .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
160
161 let i = match column_defs
162 .iter()
163 .position(|column_def| column_def.name == column_name)
164 {
165 Some(i) => i,
166 None if if_exists => return Ok(()),
167 None => {
168 return Err(AlterTableError::DroppingColumnNotFound(column_name.to_owned()).into())
169 }
170 };
171
172 column_defs.retain(|column_def| column_def.name != column_name);
173
174 let rows = self
175 .scan_data(table_name)
176 .await?
177 .and_then(|(key, mut data_row)| async move {
178 match &mut data_row {
179 DataRow::Map(_) => Err(AlterTableError::ConflictOnUnexpectedMapRowFound.into()),
180 DataRow::Vec(ref mut rows) => {
181 rows.remove(i);
182
183 Ok((key, data_row))
184 }
185 }
186 })
187 .try_collect::<Vec<_>>()
188 .await?;
189
190 self.insert_schema(&schema).await?;
191 self.insert_data(table_name, rows).await
192 }
193}