gluesql_redis_storage/
lib.rs

1#![deny(clippy::str_to_string)]
2
3mod alter_table;
4mod index;
5mod metadata;
6mod mutex;
7mod transaction;
8
9use mutex::MutexExt;
10use {
11    async_trait::async_trait,
12    futures::stream::iter,
13    gluesql_core::{
14        chrono::Utc,
15        data::{CustomFunction as StructCustomFunction, Key, Schema, Value},
16        error::{Error, Result},
17        store::{CustomFunction, CustomFunctionMut, DataRow, RowIter, Store, StoreMut},
18    },
19    redis::{Commands, Connection},
20    std::{collections::BTreeMap, sync::Mutex},
21};
22
23pub struct RedisStorage {
24    pub namespace: String,
25    pub conn: Mutex<Connection>,
26}
27
28impl RedisStorage {
29    pub fn new(namespace: &str, url: &str, port: u16) -> Self {
30        let redis_url = format!("redis://{url}:{port}");
31        let conn = redis::Client::open(redis_url)
32            .expect("Invalid connection URL")
33            .get_connection()
34            .expect("failed to connect to Redis");
35
36        RedisStorage {
37            namespace: namespace.to_owned(),
38            conn: Mutex::new(conn),
39        }
40    }
41
42    ///
43    /// Make a key to insert/delete a value with the namespace, table-name.
44    ///
45    /// Redis documentation recommends to use ':' as a separator for namespace and table-name.
46    /// But it is not a good idea when using serde_json to serialize/deserialize a key.
47    /// JSON uses ':' as a separator for key and value. So it conflicts with the JSON format.
48    /// Therefore I use '#' as a separator: "namespace"#"table-name"#"key"#"value".
49    ///
50    fn redis_generate_key(namespace: &str, table_name: &str, key: &Key) -> Result<String> {
51        let k = serde_json::to_string(key).map_err(|e| {
52            Error::StorageMsg(format!(
53                "[RedisStorage] failed to serialize key key:{key:?}, error={e}"
54            ))
55        })?;
56        Ok(format!("{namespace}#{table_name}#{k}"))
57    }
58
59    ///
60    /// Parse a redis-key to get the original key of the table
61    ///
62    pub fn redis_parse_key(redis_key: &str) -> Result<Key> {
63        let split_key = redis_key.split('#').collect::<Vec<&str>>();
64        serde_json::from_str(split_key[2]).map_err(|e| {
65            Error::StorageMsg(format!(
66                "[RedisStorage] failed to deserialize key: key={redis_key} error={e}"
67            ))
68        })
69    }
70
71    ///
72    /// Make a key pattern to do scan and get all data in the namespace
73    ///
74    fn redis_generate_scankey(namespace: &str, tablename: &str) -> String {
75        // First I used "{}#{}*" pattern. It had a problem when using
76        // similar table-names such like Test, TestA and TestB.
77        // When scanning Test, it gets all data from Test, TestA and TestB.
78        // Therefore it is very important to use the # twice.
79        format!("{namespace}#{tablename}#*")
80    }
81
82    ///
83    /// Make a key pattern to do scan and get all schemas in the namespace
84    ///
85    fn redis_generate_schema_key(namespace: &str, table_name: &str) -> String {
86        format!("#schema#{namespace}#{table_name}#")
87    }
88
89    fn redis_generate_scan_schema_key(namespace: &str) -> String {
90        format!("#schema#{namespace}#*")
91    }
92
93    fn redis_generate_metadata_key(
94        namespace: &str,
95        tablename: &str,
96        metadata_name: &str,
97    ) -> String {
98        format!("#metadata#{namespace}#{tablename}#{metadata_name}#")
99    }
100
101    fn redis_generate_scan_metadata_key(namespace: &str, tablename: &str) -> String {
102        format!("#metadata#{namespace}#{tablename}#*")
103    }
104
105    fn redis_generate_scan_all_metadata_key(namespace: &str) -> String {
106        format!("#metadata#{namespace}#*")
107    }
108
109    fn redis_execute_get(&mut self, key: &str) -> Result<Option<String>> {
110        let mut conn = self.conn.lock_err()?;
111        let value = redis::cmd("GET")
112            .arg(key)
113            .query::<String>(&mut *conn)
114            .map_err(|e| {
115                Error::StorageMsg(format!(
116                    "[RedisStorage] failed to execute GET: key={key} error={e}"
117                ))
118            })?;
119
120        Ok(Some(value))
121    }
122
123    fn redis_execute_set(&mut self, key: &str, value: &str) -> Result<()> {
124        let mut conn = self.conn.lock_err()?;
125        redis::cmd("SET")
126            .arg(key)
127            .arg(value)
128            .query::<()>(&mut *conn)
129            .map_err(|e| {
130                Error::StorageMsg(format!(
131                    "[RedisStorage] failed to execute SET: key={key} value={value} error={e}"
132                ))
133            })?;
134
135        Ok(())
136    }
137
138    pub fn redis_execute_del(&mut self, key: &str) -> Result<()> {
139        let mut conn = self.conn.lock_err()?;
140        redis::cmd("DEL")
141            .arg(key)
142            .query::<()>(&mut *conn)
143            .map_err(|e| {
144                Error::StorageMsg(format!(
145                    "[RedisStorage] failed to execute DEL: key={key} error={e}"
146                ))
147            })?;
148
149        Ok(())
150    }
151
152    pub fn redis_execute_scan(&mut self, table_name: &str) -> Result<Vec<String>> {
153        let key = Self::redis_generate_scankey(&self.namespace, table_name);
154        let redis_keys: Vec<String> = {
155            let mut conn = self.conn.lock_err()?;
156            conn.scan_match(&key)
157                .map(|iter| iter.collect::<Vec<String>>())
158                .map_err(|e| {
159                    Error::StorageMsg(format!(
160                        "[RedisStorage] failed to scan data: key={key} error={e}"
161                    ))
162                })?
163        };
164
165        Ok(redis_keys)
166    }
167
168    pub fn redis_store_schema(&mut self, schema: &Schema) -> Result<()> {
169        let schema_value = serde_json::to_string(schema).map_err(|e| {
170            Error::StorageMsg(format!(
171                "[RedisStorage] failed to serialize schema={schema:?} error={e}"
172            ))
173        })?;
174        let schema_key = Self::redis_generate_schema_key(&self.namespace, &schema.table_name);
175        self.redis_execute_set(&schema_key, &schema_value)?;
176
177        Ok(())
178    }
179
180    pub fn redis_delete_schema(&mut self, table_name: &str) -> Result<()> {
181        let schema_key = Self::redis_generate_schema_key(&self.namespace, table_name);
182        // It's already if the schema is already removed by another client.
183        if let Ok(Some(schema_value)) = self.redis_execute_get(&schema_key) {
184            let schema = serde_json::from_str::<Schema>(&schema_value).map_err(|e| {
185                Error::StorageMsg(format!(
186                    "[RedisStorage] failed to deserialize schema={schema_value:?} error={e}"
187                ))
188            })?;
189            if schema.table_name == table_name {
190                self.redis_execute_del(&schema_key)?;
191            }
192        }
193
194        Ok(())
195    }
196}
197
198#[async_trait]
199impl CustomFunction for RedisStorage {
200    async fn fetch_function<'a>(
201        &'a self,
202        _func_name: &str,
203    ) -> Result<Option<&'a StructCustomFunction>> {
204        Err(Error::StorageMsg(
205            "[RedisStorage] fetch_function is not supported yet".to_owned(),
206        ))
207    }
208
209    async fn fetch_all_functions<'a>(&'a self) -> Result<Vec<&'a StructCustomFunction>> {
210        Err(Error::StorageMsg(
211            "[RedisStorage] fetch_all_functions is not supported yet".to_owned(),
212        ))
213    }
214}
215
216#[async_trait]
217impl CustomFunctionMut for RedisStorage {
218    async fn insert_function(&mut self, _func: StructCustomFunction) -> Result<()> {
219        Err(Error::StorageMsg(
220            "[RedisStorage] insert_function is not supported yet".to_owned(),
221        ))
222    }
223
224    async fn delete_function(&mut self, _func_name: &str) -> Result<()> {
225        Err(Error::StorageMsg(
226            "[RedisStorage] delete_function is not supported yet".to_owned(),
227        ))
228    }
229}
230
231#[async_trait]
232impl Store for RedisStorage {
233    async fn fetch_all_schemas(&self) -> Result<Vec<Schema>> {
234        let mut schemas = Vec::<Schema>::new();
235        let scan_schema_key = Self::redis_generate_scan_schema_key(&self.namespace);
236        let redis_keys: Vec<String> = {
237            let mut conn = self.conn.lock_err()?;
238            conn.scan_match(&scan_schema_key)
239                .map(|iter| iter.collect::<Vec<String>>())
240                .map_err(|e| {
241                    Error::StorageMsg(format!(
242                        "[RedisStorage] failed to scan schemas: namespace={} error={}",
243                        self.namespace, e
244                    ))
245                })?
246        };
247
248        // Then read all schemas of the namespace
249        for redis_key in redis_keys.into_iter() {
250            // Another client just has removed the value with the key.
251            // It's not a problem. Just ignore it.
252            let value = {
253                let mut conn = self.conn.lock_err()?;
254                redis::cmd("GET")
255                    .arg(&redis_key)
256                    .query::<String>(&mut *conn)
257            };
258
259            if let Ok(value) = value {
260                serde_json::from_str::<Schema>(&value)
261                    .map_err(|e| {
262                        Error::StorageMsg(format!(
263                            "[RedisStorage] failed to deserialize schema={value} error={e}"
264                        ))
265                    })
266                    .map(|schema| schemas.push(schema))?;
267            }
268        }
269
270        schemas.sort_by(|a, b| a.table_name.cmp(&b.table_name));
271
272        Ok(schemas)
273    }
274
275    async fn fetch_schema(&self, table_name: &str) -> Result<Option<Schema>> {
276        let mut found = None;
277        let scan_schema_key = Self::redis_generate_scan_schema_key(&self.namespace);
278        let redis_keys: Vec<String> = {
279            let mut conn = self.conn.lock_err()?;
280            conn.scan_match(&scan_schema_key)
281                .map(|iter| iter.collect::<Vec<String>>())
282                .map_err(|e| {
283                    Error::StorageMsg(format!(
284                        "[RedisStorage] failed to scan schemas: namespace={} error={}",
285                        self.namespace, e
286                    ))
287                })?
288        };
289
290        // Then read all schemas of the namespace
291        for redis_key in redis_keys.into_iter() {
292            // Another client just has removed the value with the key.
293            // It's not a problem. Just ignore it.
294            let value = {
295                let mut conn = self.conn.lock_err()?;
296                redis::cmd("GET")
297                    .arg(&redis_key)
298                    .query::<String>(&mut *conn)
299            };
300
301            if let Ok(value) = value {
302                serde_json::from_str::<Schema>(&value)
303                    .map_err(|e| {
304                        Error::StorageMsg(format!(
305                            "[RedisStorage] failed to deserialize schema={value} error={e}"
306                        ))
307                    })
308                    .map(|schema| {
309                        if schema.table_name == table_name {
310                            found = Some(schema);
311                        }
312                    })?;
313            }
314
315            if found.is_some() {
316                break;
317            }
318        }
319
320        Ok(found)
321    }
322
323    async fn fetch_data(&self, table_name: &str, key: &Key) -> Result<Option<DataRow>> {
324        let key = Self::redis_generate_key(&self.namespace, table_name, key)?;
325        // It's not a problem if the value with the key is removed by another client.
326        let value = {
327            let mut conn = self.conn.lock_err()?;
328            redis::cmd("GET").arg(&key).query::<String>(&mut *conn)
329        };
330        if let Ok(value) = value {
331            return serde_json::from_str::<DataRow>(&value)
332                .map_err(|e| {
333                    Error::StorageMsg(format!(
334                        "[RedisStorage] failed to deserialize value={value} error={e:?}"
335                    ))
336                })
337                .map(Some);
338        }
339        Ok(None)
340    }
341
342    async fn scan_data<'a>(&'a self, table_name: &str) -> Result<RowIter<'a>> {
343        // First read all keys of the table
344        let redis_keys: Vec<String> = {
345            let mut conn = self.conn.lock_err()?;
346            conn.scan_match(Self::redis_generate_scankey(&self.namespace, table_name))
347                .map(|iter| iter.collect::<Vec<String>>())
348                .map_err(|e| {
349                    Error::StorageMsg(format!(
350                        "[RedisStorage] failed to scan data: namespace={} table_name={} error={}",
351                        self.namespace, table_name, e
352                    ))
353                })?
354        };
355
356        let mut rows = BTreeMap::new();
357        for redis_key in redis_keys.into_iter() {
358            // Another client just has removed the value with the key.
359            // It's not a problem. Just ignore it.
360            let value = {
361                let mut conn = self.conn.lock_err()?;
362                redis::cmd("GET")
363                    .arg(&redis_key)
364                    .query::<String>(&mut *conn)
365            };
366            let value = match value {
367                Ok(v) => v,
368                Err(_) => continue,
369            };
370
371            let key = Self::redis_parse_key(&redis_key).map_err(|e| {
372                Error::StorageMsg(format!(
373                    "[RedisStorage] Wrong key format: key={redis_key} error={e}"
374                ))
375            })?;
376
377            let row = serde_json::from_str::<DataRow>(&value).map_err(|e| {
378                Error::StorageMsg(format!(
379                    "[RedisStorage] failed to deserialize value={value} error={e:?}"
380                ))
381            })?;
382
383            rows.insert(key, row);
384        }
385
386        Ok(Box::pin(iter(rows.into_iter().map(Ok))))
387    }
388}
389
390#[async_trait]
391impl StoreMut for RedisStorage {
392    async fn insert_schema(&mut self, schema: &Schema) -> Result<()> {
393        let current_time = Value::Timestamp(Utc::now().naive_utc());
394        let current_time_value = serde_json::to_string(&current_time).map_err(|e| {
395            Error::StorageMsg(format!(
396                "[RedisStorage] failed to serialize metadata={current_time:?} error={e}"
397            ))
398        })?;
399        let metadata_key =
400            Self::redis_generate_metadata_key(&self.namespace, &schema.table_name, "CREATED");
401        self.redis_execute_set(&metadata_key, &current_time_value)?;
402
403        let table_name = schema.table_name.clone();
404        let metadata_key =
405            Self::redis_generate_metadata_key(&self.namespace, &table_name, "CREATED");
406        let metadata_value = serde_json::to_string(&current_time).map_err(|e| {
407            Error::StorageMsg(format!(
408                "[RedisStorage] failed to serialize metadata={current_time:?} error={e}"
409            ))
410        })?;
411        self.redis_execute_set(&metadata_key, &metadata_value)?;
412
413        // Finally it's ok to store the new schema
414        self.redis_store_schema(schema)?;
415
416        Ok(())
417    }
418
419    async fn delete_schema(&mut self, table_name: &str) -> Result<()> {
420        let redis_key_iter: Vec<String> = self.redis_execute_scan(table_name)?;
421        for key in redis_key_iter {
422            self.redis_execute_del(&key)?;
423        }
424
425        // delete metadata
426        let metadata_scan_key = Self::redis_generate_scan_metadata_key(&self.namespace, table_name);
427        let metadata_redis_keys: Vec<String> = {
428            let mut conn = self.conn.lock_err()?;
429            conn.scan_match(&metadata_scan_key)
430                .map(|iter| iter.collect::<Vec<String>>())
431                .map_err(|e| {
432                    Error::StorageMsg(format!(
433                        "[RedisStorage] failed to scan metadata: namespace={} table_name={} error={}",
434                        self.namespace, table_name, e
435                    ))
436                })?
437        };
438        for key in metadata_redis_keys {
439            self.redis_execute_del(&key)?;
440        }
441
442        self.redis_delete_schema(table_name)?;
443
444        Ok(())
445    }
446
447    async fn append_data(&mut self, table_name: &str, rows: Vec<DataRow>) -> Result<()> {
448        for row in rows {
449            // Even multiple clients can get an unique value with INCR command.
450            // and a shared key "globalkey"
451            let k = {
452                let mut conn = self.conn.lock_err()?;
453                redis::cmd("INCR")
454                    .arg("globalkey")
455                    .query::<i64>(&mut *conn)
456                    .map_err(|_| {
457                        Error::StorageMsg("[RedisStorage] failed to execute INCR".to_owned())
458                    })?
459            };
460            let key = Key::I64(k);
461            let redis_key = Self::redis_generate_key(&self.namespace, table_name, &key)?;
462            let value = serde_json::to_string(&row).map_err(|e| {
463                Error::StorageMsg(format!(
464                    "[RedisStorage] failed to serialize row={row:?} error={e}"
465                ))
466            })?;
467
468            self.redis_execute_set(&redis_key, &value)?;
469        }
470
471        Ok(())
472    }
473
474    async fn insert_data(&mut self, table_name: &str, rows: Vec<(Key, DataRow)>) -> Result<()> {
475        for (key, row) in rows {
476            let redis_key = Self::redis_generate_key(&self.namespace, table_name, &key)?;
477            let value = serde_json::to_string(&row).map_err(|e| {
478                Error::StorageMsg(format!(
479                    "[RedisStorage] failed to serialize row={row:?} error={e}"
480                ))
481            })?;
482            self.redis_execute_set(&redis_key, &value)?;
483        }
484
485        Ok(())
486    }
487
488    async fn delete_data(&mut self, table_name: &str, keys: Vec<Key>) -> Result<()> {
489        for key in keys {
490            let redis_key = Self::redis_generate_key(&self.namespace, table_name, &key)?;
491            self.redis_execute_del(&redis_key)?;
492        }
493
494        Ok(())
495    }
496}