gluesql_redis_storage/
alter_table.rs1use {
2 super::{RedisStorage, mutex::MutexExt},
3 async_trait::async_trait,
4 gluesql_core::{
5 ast::ColumnDef,
6 data::Value,
7 error::{AlterTableError, Error, Result},
8 store::{AlterTable, DataRow, Store},
9 },
10 redis::Commands,
11};
12
13#[async_trait]
14impl AlterTable for RedisStorage {
15 async fn rename_schema(&mut self, table_name: &str, new_table_name: &str) -> Result<()> {
16 if let Some(mut schema) = self.fetch_schema(table_name).await? {
17 self.redis_delete_schema(table_name)?;
19
20 new_table_name.clone_into(&mut schema.table_name);
21 self.redis_store_schema(&schema)?;
22
23 let redis_key_iter: Vec<String> = self.redis_execute_scan(table_name)?;
24
25 for redis_key in redis_key_iter {
26 if let Some(value) = self.redis_execute_get(&redis_key)? {
27 let key = Self::redis_parse_key(&redis_key)?;
28 let new_key = Self::redis_generate_key(&self.namespace, new_table_name, &key)?;
29
30 self.redis_execute_set(&new_key, &value)?;
31 self.redis_execute_del(&redis_key)?;
32 }
33 }
34 } else {
35 return Err(AlterTableError::TableNotFound(table_name.to_owned()).into());
36 }
37
38 Ok(())
39 }
40
41 async fn rename_column(
42 &mut self,
43 table_name: &str,
44 old_column_name: &str,
45 new_column_name: &str,
46 ) -> Result<()> {
47 if let Some(mut schema) = self.fetch_schema(table_name).await? {
48 let column_defs = schema
49 .column_defs
50 .as_mut()
51 .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
52
53 if column_defs
54 .iter()
55 .any(|ColumnDef { name, .. }| name == new_column_name)
56 {
57 return Err(
58 AlterTableError::AlreadyExistingColumn(new_column_name.to_owned()).into(),
59 );
60 }
61
62 let column_def = column_defs
63 .iter_mut()
64 .find(|column_def| column_def.name == old_column_name)
65 .ok_or(AlterTableError::RenamingColumnNotFound)?;
66
67 new_column_name.clone_into(&mut column_def.name);
68
69 self.redis_delete_schema(table_name)?;
70 self.redis_store_schema(&schema)?;
71 } else {
72 return Err(AlterTableError::TableNotFound(table_name.to_owned()).into());
73 }
74
75 Ok(())
76 }
77
78 async fn add_column(&mut self, table_name: &str, column_def: &ColumnDef) -> Result<()> {
79 if let Some(mut schema) = self.fetch_schema(table_name).await? {
80 let column_defs = schema
81 .column_defs
82 .as_mut()
83 .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
84
85 if column_defs
86 .iter()
87 .any(|ColumnDef { name, .. }| name == &column_def.name)
88 {
89 let adding_column = column_def.name.to_owned();
90
91 return Err(AlterTableError::AlreadyExistingColumn(adding_column).into());
92 }
93
94 let ColumnDef {
95 data_type,
96 nullable,
97 default,
98 ..
99 } = column_def;
100
101 let new_value_of_new_column = match (default, nullable) {
102 (Some(expr), _) => {
103 let evaluated = gluesql_core::executor::evaluate_stateless(None, expr).await?;
104
105 evaluated.try_into_value(data_type, *nullable)?
106 }
107 (None, true) => Value::Null,
108 (None, false) => {
109 return Err(AlterTableError::DefaultValueRequired(column_def.clone()).into());
110 }
111 };
112
113 let scan_key = Self::redis_generate_scankey(&self.namespace, table_name);
119 let key_iter: Vec<String> = {
120 let mut conn = self.conn.lock_err()?;
121 conn.scan_match(&scan_key)
122 .map(|iter| iter.collect::<Vec<String>>())
123 .map_err(|_| {
124 Error::StorageMsg(format!(
125 "[RedisStorage] failed to execute SCAN: key={scan_key}"
126 ))
127 })?
128 };
129
130 for key in key_iter {
131 let value = {
132 let mut conn = self.conn.lock_err()?;
133 redis::cmd("GET")
134 .arg(&key)
135 .query::<String>(&mut *conn)
136 .map_err(|_| {
137 Error::StorageMsg(format!(
138 "[RedisStorage] failed to execute GET: key={key}"
139 ))
140 })?
141 };
142
143 let mut row: DataRow = serde_json::from_str(&value).map_err(|e| {
144 Error::StorageMsg(format!(
145 "[RedisStorage] failed to deserialize value={value} error={e}"
146 ))
147 })?;
148 match &mut row {
149 DataRow::Vec(values) => {
150 values.push(new_value_of_new_column.clone());
151 }
152 DataRow::Map(_) => {
153 return Err(Error::StorageMsg(
154 "[RedisStorage] conflict - add_column failed: schemaless row found"
155 .to_owned(),
156 ));
157 }
158 }
159
160 let new_value = serde_json::to_string(&row).map_err(|_e| {
161 Error::StorageMsg(format!(
162 "[RedisStorage] failed to serialize row={row:?} error={_e}"
163 ))
164 })?;
165 let _: () = {
166 let mut conn = self.conn.lock_err()?;
167 redis::cmd("SET")
168 .arg(&key)
169 .arg(new_value)
170 .query(&mut *conn)
171 .map_err(|_| {
172 Error::StorageMsg(format!(
173 "[RedisStorage] add_column: failed to execute SET for row={row:?}"
174 ))
175 })?
176 };
177 }
178
179 column_defs.push(column_def.clone());
180 self.redis_delete_schema(table_name)?; self.redis_store_schema(&schema)?;
182 } else {
183 return Err(AlterTableError::TableNotFound(table_name.to_owned()).into());
184 }
185
186 Ok(())
187 }
188
189 async fn drop_column(
190 &mut self,
191 table_name: &str,
192 column_name: &str,
193 if_exists: bool,
194 ) -> Result<()> {
195 if let Some(mut schema) = self.fetch_schema(table_name).await? {
196 let column_defs = schema
197 .column_defs
198 .as_mut()
199 .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
200
201 let column_index = column_defs
202 .iter()
203 .position(|column_def| column_def.name == column_name);
204
205 match column_index {
206 Some(column_index) => {
207 column_defs.remove(column_index);
208
209 let key_iter = self.redis_execute_scan(table_name)?;
210 for key in key_iter {
211 if let Some(value) = self.redis_execute_get(&key)? {
212 let mut row: DataRow = serde_json::from_str(&value).map_err(|e| {
213 Error::StorageMsg(format!(
214 "[RedisStorage] failed to deserialize value={value} error={e}"
215 ))
216 })?;
217 match &mut row {
218 DataRow::Vec(values) => {
219 values.remove(column_index);
220 }
221 DataRow::Map(_) => {
222 return Err(Error::StorageMsg(
223 "[RedisStorage] conflict - drop_column failed: schemaless row found".to_owned(),
224 ));
225 }
226 }
227
228 let new_value = serde_json::to_string(&row).map_err(|e| {
229 Error::StorageMsg(format!(
230 "[RedisStorage] failed to serialize row={row:?} error={e}"
231 ))
232 })?;
233 self.redis_execute_set(&key, &new_value)?;
234 }
235 }
236 }
237 None if if_exists => {}
238 None => {
239 return Err(
240 AlterTableError::DroppingColumnNotFound(column_name.to_owned()).into(),
241 );
242 }
243 };
244
245 self.redis_delete_schema(table_name)?; self.redis_store_schema(&schema)?;
247 } else {
248 return Err(AlterTableError::TableNotFound(table_name.to_owned()).into());
249 }
250
251 Ok(())
252 }
253}