gluesql_redis_storage/
alter_table.rs

1use {
2    super::{RedisStorage, mutex::MutexExt},
3    async_trait::async_trait,
4    gluesql_core::{
5        ast::ColumnDef,
6        data::Value,
7        error::{AlterTableError, Error, Result},
8        store::{AlterTable, DataRow, Store},
9    },
10    redis::Commands,
11};
12
13#[async_trait]
14impl AlterTable for RedisStorage {
15    async fn rename_schema(&mut self, table_name: &str, new_table_name: &str) -> Result<()> {
16        if let Some(mut schema) = self.fetch_schema(table_name).await? {
17            // Which should be done first? deleting or storing?
18            self.redis_delete_schema(table_name)?;
19
20            new_table_name.clone_into(&mut schema.table_name);
21            self.redis_store_schema(&schema)?;
22
23            let redis_key_iter: Vec<String> = self.redis_execute_scan(table_name)?;
24
25            for redis_key in redis_key_iter {
26                if let Some(value) = self.redis_execute_get(&redis_key)? {
27                    let key = Self::redis_parse_key(&redis_key)?;
28                    let new_key = Self::redis_generate_key(&self.namespace, new_table_name, &key)?;
29
30                    self.redis_execute_set(&new_key, &value)?;
31                    self.redis_execute_del(&redis_key)?;
32                }
33            }
34        } else {
35            return Err(AlterTableError::TableNotFound(table_name.to_owned()).into());
36        }
37
38        Ok(())
39    }
40
41    async fn rename_column(
42        &mut self,
43        table_name: &str,
44        old_column_name: &str,
45        new_column_name: &str,
46    ) -> Result<()> {
47        if let Some(mut schema) = self.fetch_schema(table_name).await? {
48            let column_defs = schema
49                .column_defs
50                .as_mut()
51                .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
52
53            if column_defs
54                .iter()
55                .any(|ColumnDef { name, .. }| name == new_column_name)
56            {
57                return Err(
58                    AlterTableError::AlreadyExistingColumn(new_column_name.to_owned()).into(),
59                );
60            }
61
62            let column_def = column_defs
63                .iter_mut()
64                .find(|column_def| column_def.name == old_column_name)
65                .ok_or(AlterTableError::RenamingColumnNotFound)?;
66
67            new_column_name.clone_into(&mut column_def.name);
68
69            self.redis_delete_schema(table_name)?;
70            self.redis_store_schema(&schema)?;
71        } else {
72            return Err(AlterTableError::TableNotFound(table_name.to_owned()).into());
73        }
74
75        Ok(())
76    }
77
78    async fn add_column(&mut self, table_name: &str, column_def: &ColumnDef) -> Result<()> {
79        if let Some(mut schema) = self.fetch_schema(table_name).await? {
80            let column_defs = schema
81                .column_defs
82                .as_mut()
83                .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
84
85            if column_defs
86                .iter()
87                .any(|ColumnDef { name, .. }| name == &column_def.name)
88            {
89                let adding_column = column_def.name.to_owned();
90
91                return Err(AlterTableError::AlreadyExistingColumn(adding_column).into());
92            }
93
94            let ColumnDef {
95                data_type,
96                nullable,
97                default,
98                ..
99            } = column_def;
100
101            let new_value_of_new_column = match (default, nullable) {
102                (Some(expr), _) => {
103                    let evaluated = gluesql_core::executor::evaluate_stateless(None, expr).await?;
104
105                    evaluated.try_into_value(data_type, *nullable)?
106                }
107                (None, true) => Value::Null,
108                (None, false) => {
109                    return Err(AlterTableError::DefaultValueRequired(column_def.clone()).into());
110                }
111            };
112
113            // NOTE: It cannot call self.redis_execute_scan/get/set methods directly.
114            // column_defs has a reference to item and the item has a reference to self.
115            // Therefore it cannot call self.redis_execute_scan method because
116            // it needs to use the mutable reference of self.
117            // Otherwise, it will cause a mutable reference conflict.
118            let scan_key = Self::redis_generate_scankey(&self.namespace, table_name);
119            let key_iter: Vec<String> = {
120                let mut conn = self.conn.lock_err()?;
121                conn.scan_match(&scan_key)
122                    .map(|iter| iter.collect::<Vec<String>>())
123                    .map_err(|_| {
124                        Error::StorageMsg(format!(
125                            "[RedisStorage] failed to execute SCAN: key={scan_key}"
126                        ))
127                    })?
128            };
129
130            for key in key_iter {
131                let value = {
132                    let mut conn = self.conn.lock_err()?;
133                    redis::cmd("GET")
134                        .arg(&key)
135                        .query::<String>(&mut *conn)
136                        .map_err(|_| {
137                            Error::StorageMsg(format!(
138                                "[RedisStorage] failed to execute GET: key={key}"
139                            ))
140                        })?
141                };
142
143                let mut row: DataRow = serde_json::from_str(&value).map_err(|e| {
144                    Error::StorageMsg(format!(
145                        "[RedisStorage] failed to deserialize value={value} error={e}"
146                    ))
147                })?;
148                match &mut row {
149                    DataRow::Vec(values) => {
150                        values.push(new_value_of_new_column.clone());
151                    }
152                    DataRow::Map(_) => {
153                        return Err(Error::StorageMsg(
154                            "[RedisStorage] conflict - add_column failed: schemaless row found"
155                                .to_owned(),
156                        ));
157                    }
158                }
159
160                let new_value = serde_json::to_string(&row).map_err(|_e| {
161                    Error::StorageMsg(format!(
162                        "[RedisStorage] failed to serialize row={row:?} error={_e}"
163                    ))
164                })?;
165                let _: () = {
166                    let mut conn = self.conn.lock_err()?;
167                    redis::cmd("SET")
168                        .arg(&key)
169                        .arg(new_value)
170                        .query(&mut *conn)
171                        .map_err(|_| {
172                            Error::StorageMsg(format!(
173                                "[RedisStorage] add_column: failed to execute SET for row={row:?}"
174                            ))
175                        })?
176                };
177            }
178
179            column_defs.push(column_def.clone());
180            self.redis_delete_schema(table_name)?; // No problem yet, finally it's ok to delete the old schema
181            self.redis_store_schema(&schema)?;
182        } else {
183            return Err(AlterTableError::TableNotFound(table_name.to_owned()).into());
184        }
185
186        Ok(())
187    }
188
189    async fn drop_column(
190        &mut self,
191        table_name: &str,
192        column_name: &str,
193        if_exists: bool,
194    ) -> Result<()> {
195        if let Some(mut schema) = self.fetch_schema(table_name).await? {
196            let column_defs = schema
197                .column_defs
198                .as_mut()
199                .ok_or_else(|| AlterTableError::SchemalessTableFound(table_name.to_owned()))?;
200
201            let column_index = column_defs
202                .iter()
203                .position(|column_def| column_def.name == column_name);
204
205            match column_index {
206                Some(column_index) => {
207                    column_defs.remove(column_index);
208
209                    let key_iter = self.redis_execute_scan(table_name)?;
210                    for key in key_iter {
211                        if let Some(value) = self.redis_execute_get(&key)? {
212                            let mut row: DataRow = serde_json::from_str(&value).map_err(|e| {
213                                Error::StorageMsg(format!(
214                                    "[RedisStorage] failed to deserialize value={value} error={e}"
215                                ))
216                            })?;
217                            match &mut row {
218                                DataRow::Vec(values) => {
219                                    values.remove(column_index);
220                                }
221                                DataRow::Map(_) => {
222                                    return Err(Error::StorageMsg(
223                                    "[RedisStorage] conflict - drop_column failed: schemaless row found".to_owned(),
224                                ));
225                                }
226                            }
227
228                            let new_value = serde_json::to_string(&row).map_err(|e| {
229                                Error::StorageMsg(format!(
230                                    "[RedisStorage] failed to serialize row={row:?} error={e}"
231                                ))
232                            })?;
233                            self.redis_execute_set(&key, &new_value)?;
234                        }
235                    }
236                }
237                None if if_exists => {}
238                None => {
239                    return Err(
240                        AlterTableError::DroppingColumnNotFound(column_name.to_owned()).into(),
241                    );
242                }
243            };
244
245            self.redis_delete_schema(table_name)?; // No problem yet, finally it's ok to delete the old schema
246            self.redis_store_schema(&schema)?;
247        } else {
248            return Err(AlterTableError::TableNotFound(table_name.to_owned()).into());
249        }
250
251        Ok(())
252    }
253}