1#![deny(clippy::str_to_string)]
2
3mod alter_table;
4mod index;
5mod metadata;
6mod mutex;
7mod transaction;
8
9use mutex::MutexExt;
10use {
11 async_trait::async_trait,
12 futures::stream::iter,
13 gluesql_core::{
14 chrono::Utc,
15 data::{CustomFunction as StructCustomFunction, Key, Schema, Value},
16 error::{Error, Result},
17 store::{CustomFunction, CustomFunctionMut, DataRow, RowIter, Store, StoreMut},
18 },
19 redis::{Commands, Connection},
20 std::{collections::BTreeMap, sync::Mutex},
21};
22
23pub struct RedisStorage {
24 pub namespace: String,
25 pub conn: Mutex<Connection>,
26}
27
28impl RedisStorage {
29 pub fn new(namespace: &str, url: &str, port: u16) -> Self {
30 let redis_url = format!("redis://{url}:{port}");
31 let conn = redis::Client::open(redis_url)
32 .expect("Invalid connection URL")
33 .get_connection()
34 .expect("failed to connect to Redis");
35
36 RedisStorage {
37 namespace: namespace.to_owned(),
38 conn: Mutex::new(conn),
39 }
40 }
41
42 fn redis_generate_key(namespace: &str, table_name: &str, key: &Key) -> Result<String> {
51 let k = serde_json::to_string(key).map_err(|e| {
52 Error::StorageMsg(format!(
53 "[RedisStorage] failed to serialize key key:{key:?}, error={e}"
54 ))
55 })?;
56 Ok(format!("{namespace}#{table_name}#{k}"))
57 }
58
59 pub fn redis_parse_key(redis_key: &str) -> Result<Key> {
63 let split_key = redis_key.split('#').collect::<Vec<&str>>();
64 serde_json::from_str(split_key[2]).map_err(|e| {
65 Error::StorageMsg(format!(
66 "[RedisStorage] failed to deserialize key: key={redis_key} error={e}"
67 ))
68 })
69 }
70
71 fn redis_generate_scankey(namespace: &str, tablename: &str) -> String {
75 format!("{namespace}#{tablename}#*")
80 }
81
82 fn redis_generate_schema_key(namespace: &str, table_name: &str) -> String {
86 format!("#schema#{namespace}#{table_name}#")
87 }
88
89 fn redis_generate_scan_schema_key(namespace: &str) -> String {
90 format!("#schema#{namespace}#*")
91 }
92
93 fn redis_generate_metadata_key(
94 namespace: &str,
95 tablename: &str,
96 metadata_name: &str,
97 ) -> String {
98 format!("#metadata#{namespace}#{tablename}#{metadata_name}#")
99 }
100
101 fn redis_generate_scan_metadata_key(namespace: &str, tablename: &str) -> String {
102 format!("#metadata#{namespace}#{tablename}#*")
103 }
104
105 fn redis_generate_scan_all_metadata_key(namespace: &str) -> String {
106 format!("#metadata#{namespace}#*")
107 }
108
109 fn redis_execute_get(&mut self, key: &str) -> Result<Option<String>> {
110 let mut conn = self.conn.lock_err()?;
111 let value = redis::cmd("GET")
112 .arg(key)
113 .query::<String>(&mut *conn)
114 .map_err(|e| {
115 Error::StorageMsg(format!(
116 "[RedisStorage] failed to execute GET: key={key} error={e}"
117 ))
118 })?;
119
120 Ok(Some(value))
121 }
122
123 fn redis_execute_set(&mut self, key: &str, value: &str) -> Result<()> {
124 let mut conn = self.conn.lock_err()?;
125 redis::cmd("SET")
126 .arg(key)
127 .arg(value)
128 .query::<()>(&mut *conn)
129 .map_err(|e| {
130 Error::StorageMsg(format!(
131 "[RedisStorage] failed to execute SET: key={key} value={value} error={e}"
132 ))
133 })?;
134
135 Ok(())
136 }
137
138 pub fn redis_execute_del(&mut self, key: &str) -> Result<()> {
139 let mut conn = self.conn.lock_err()?;
140 redis::cmd("DEL")
141 .arg(key)
142 .query::<()>(&mut *conn)
143 .map_err(|e| {
144 Error::StorageMsg(format!(
145 "[RedisStorage] failed to execute DEL: key={key} error={e}"
146 ))
147 })?;
148
149 Ok(())
150 }
151
152 pub fn redis_execute_scan(&mut self, table_name: &str) -> Result<Vec<String>> {
153 let key = Self::redis_generate_scankey(&self.namespace, table_name);
154 let redis_keys: Vec<String> = {
155 let mut conn = self.conn.lock_err()?;
156 conn.scan_match(&key)
157 .map(|iter| iter.collect::<Vec<String>>())
158 .map_err(|e| {
159 Error::StorageMsg(format!(
160 "[RedisStorage] failed to scan data: key={key} error={e}"
161 ))
162 })?
163 };
164
165 Ok(redis_keys)
166 }
167
168 pub fn redis_store_schema(&mut self, schema: &Schema) -> Result<()> {
169 let schema_value = serde_json::to_string(schema).map_err(|e| {
170 Error::StorageMsg(format!(
171 "[RedisStorage] failed to serialize schema={schema:?} error={e}"
172 ))
173 })?;
174 let schema_key = Self::redis_generate_schema_key(&self.namespace, &schema.table_name);
175 self.redis_execute_set(&schema_key, &schema_value)?;
176
177 Ok(())
178 }
179
180 pub fn redis_delete_schema(&mut self, table_name: &str) -> Result<()> {
181 let schema_key = Self::redis_generate_schema_key(&self.namespace, table_name);
182 if let Ok(Some(schema_value)) = self.redis_execute_get(&schema_key) {
184 let schema = serde_json::from_str::<Schema>(&schema_value).map_err(|e| {
185 Error::StorageMsg(format!(
186 "[RedisStorage] failed to deserialize schema={schema_value:?} error={e}"
187 ))
188 })?;
189 if schema.table_name == table_name {
190 self.redis_execute_del(&schema_key)?;
191 }
192 }
193
194 Ok(())
195 }
196}
197
198#[async_trait]
199impl CustomFunction for RedisStorage {
200 async fn fetch_function<'a>(
201 &'a self,
202 _func_name: &str,
203 ) -> Result<Option<&'a StructCustomFunction>> {
204 Err(Error::StorageMsg(
205 "[RedisStorage] fetch_function is not supported yet".to_owned(),
206 ))
207 }
208
209 async fn fetch_all_functions<'a>(&'a self) -> Result<Vec<&'a StructCustomFunction>> {
210 Err(Error::StorageMsg(
211 "[RedisStorage] fetch_all_functions is not supported yet".to_owned(),
212 ))
213 }
214}
215
216#[async_trait]
217impl CustomFunctionMut for RedisStorage {
218 async fn insert_function(&mut self, _func: StructCustomFunction) -> Result<()> {
219 Err(Error::StorageMsg(
220 "[RedisStorage] insert_function is not supported yet".to_owned(),
221 ))
222 }
223
224 async fn delete_function(&mut self, _func_name: &str) -> Result<()> {
225 Err(Error::StorageMsg(
226 "[RedisStorage] delete_function is not supported yet".to_owned(),
227 ))
228 }
229}
230
231#[async_trait]
232impl Store for RedisStorage {
233 async fn fetch_all_schemas(&self) -> Result<Vec<Schema>> {
234 let mut schemas = Vec::<Schema>::new();
235 let scan_schema_key = Self::redis_generate_scan_schema_key(&self.namespace);
236 let redis_keys: Vec<String> = {
237 let mut conn = self.conn.lock_err()?;
238 conn.scan_match(&scan_schema_key)
239 .map(|iter| iter.collect::<Vec<String>>())
240 .map_err(|e| {
241 Error::StorageMsg(format!(
242 "[RedisStorage] failed to scan schemas: namespace={} error={}",
243 self.namespace, e
244 ))
245 })?
246 };
247
248 for redis_key in redis_keys.into_iter() {
250 let value = {
253 let mut conn = self.conn.lock_err()?;
254 redis::cmd("GET")
255 .arg(&redis_key)
256 .query::<String>(&mut *conn)
257 };
258
259 if let Ok(value) = value {
260 serde_json::from_str::<Schema>(&value)
261 .map_err(|e| {
262 Error::StorageMsg(format!(
263 "[RedisStorage] failed to deserialize schema={value} error={e}"
264 ))
265 })
266 .map(|schema| schemas.push(schema))?;
267 }
268 }
269
270 schemas.sort_by(|a, b| a.table_name.cmp(&b.table_name));
271
272 Ok(schemas)
273 }
274
275 async fn fetch_schema(&self, table_name: &str) -> Result<Option<Schema>> {
276 let mut found = None;
277 let scan_schema_key = Self::redis_generate_scan_schema_key(&self.namespace);
278 let redis_keys: Vec<String> = {
279 let mut conn = self.conn.lock_err()?;
280 conn.scan_match(&scan_schema_key)
281 .map(|iter| iter.collect::<Vec<String>>())
282 .map_err(|e| {
283 Error::StorageMsg(format!(
284 "[RedisStorage] failed to scan schemas: namespace={} error={}",
285 self.namespace, e
286 ))
287 })?
288 };
289
290 for redis_key in redis_keys.into_iter() {
292 let value = {
295 let mut conn = self.conn.lock_err()?;
296 redis::cmd("GET")
297 .arg(&redis_key)
298 .query::<String>(&mut *conn)
299 };
300
301 if let Ok(value) = value {
302 serde_json::from_str::<Schema>(&value)
303 .map_err(|e| {
304 Error::StorageMsg(format!(
305 "[RedisStorage] failed to deserialize schema={value} error={e}"
306 ))
307 })
308 .map(|schema| {
309 if schema.table_name == table_name {
310 found = Some(schema);
311 }
312 })?;
313 }
314
315 if found.is_some() {
316 break;
317 }
318 }
319
320 Ok(found)
321 }
322
323 async fn fetch_data(&self, table_name: &str, key: &Key) -> Result<Option<DataRow>> {
324 let key = Self::redis_generate_key(&self.namespace, table_name, key)?;
325 let value = {
327 let mut conn = self.conn.lock_err()?;
328 redis::cmd("GET").arg(&key).query::<String>(&mut *conn)
329 };
330 if let Ok(value) = value {
331 return serde_json::from_str::<DataRow>(&value)
332 .map_err(|e| {
333 Error::StorageMsg(format!(
334 "[RedisStorage] failed to deserialize value={value} error={e:?}"
335 ))
336 })
337 .map(Some);
338 }
339 Ok(None)
340 }
341
342 async fn scan_data<'a>(&'a self, table_name: &str) -> Result<RowIter<'a>> {
343 let redis_keys: Vec<String> = {
345 let mut conn = self.conn.lock_err()?;
346 conn.scan_match(Self::redis_generate_scankey(&self.namespace, table_name))
347 .map(|iter| iter.collect::<Vec<String>>())
348 .map_err(|e| {
349 Error::StorageMsg(format!(
350 "[RedisStorage] failed to scan data: namespace={} table_name={} error={}",
351 self.namespace, table_name, e
352 ))
353 })?
354 };
355
356 let mut rows = BTreeMap::new();
357 for redis_key in redis_keys.into_iter() {
358 let value = {
361 let mut conn = self.conn.lock_err()?;
362 redis::cmd("GET")
363 .arg(&redis_key)
364 .query::<String>(&mut *conn)
365 };
366 let value = match value {
367 Ok(v) => v,
368 Err(_) => continue,
369 };
370
371 let key = Self::redis_parse_key(&redis_key).map_err(|e| {
372 Error::StorageMsg(format!(
373 "[RedisStorage] Wrong key format: key={redis_key} error={e}"
374 ))
375 })?;
376
377 let row = serde_json::from_str::<DataRow>(&value).map_err(|e| {
378 Error::StorageMsg(format!(
379 "[RedisStorage] failed to deserialize value={value} error={e:?}"
380 ))
381 })?;
382
383 rows.insert(key, row);
384 }
385
386 Ok(Box::pin(iter(rows.into_iter().map(Ok))))
387 }
388}
389
390#[async_trait]
391impl StoreMut for RedisStorage {
392 async fn insert_schema(&mut self, schema: &Schema) -> Result<()> {
393 let current_time = Value::Timestamp(Utc::now().naive_utc());
394 let current_time_value = serde_json::to_string(¤t_time).map_err(|e| {
395 Error::StorageMsg(format!(
396 "[RedisStorage] failed to serialize metadata={current_time:?} error={e}"
397 ))
398 })?;
399 let metadata_key =
400 Self::redis_generate_metadata_key(&self.namespace, &schema.table_name, "CREATED");
401 self.redis_execute_set(&metadata_key, ¤t_time_value)?;
402
403 let table_name = schema.table_name.clone();
404 let metadata_key =
405 Self::redis_generate_metadata_key(&self.namespace, &table_name, "CREATED");
406 let metadata_value = serde_json::to_string(¤t_time).map_err(|e| {
407 Error::StorageMsg(format!(
408 "[RedisStorage] failed to serialize metadata={current_time:?} error={e}"
409 ))
410 })?;
411 self.redis_execute_set(&metadata_key, &metadata_value)?;
412
413 self.redis_store_schema(schema)?;
415
416 Ok(())
417 }
418
419 async fn delete_schema(&mut self, table_name: &str) -> Result<()> {
420 let redis_key_iter: Vec<String> = self.redis_execute_scan(table_name)?;
421 for key in redis_key_iter {
422 self.redis_execute_del(&key)?;
423 }
424
425 let metadata_scan_key = Self::redis_generate_scan_metadata_key(&self.namespace, table_name);
427 let metadata_redis_keys: Vec<String> = {
428 let mut conn = self.conn.lock_err()?;
429 conn.scan_match(&metadata_scan_key)
430 .map(|iter| iter.collect::<Vec<String>>())
431 .map_err(|e| {
432 Error::StorageMsg(format!(
433 "[RedisStorage] failed to scan metadata: namespace={} table_name={} error={}",
434 self.namespace, table_name, e
435 ))
436 })?
437 };
438 for key in metadata_redis_keys {
439 self.redis_execute_del(&key)?;
440 }
441
442 self.redis_delete_schema(table_name)?;
443
444 Ok(())
445 }
446
447 async fn append_data(&mut self, table_name: &str, rows: Vec<DataRow>) -> Result<()> {
448 for row in rows {
449 let k = {
452 let mut conn = self.conn.lock_err()?;
453 redis::cmd("INCR")
454 .arg("globalkey")
455 .query::<i64>(&mut *conn)
456 .map_err(|_| {
457 Error::StorageMsg("[RedisStorage] failed to execute INCR".to_owned())
458 })?
459 };
460 let key = Key::I64(k);
461 let redis_key = Self::redis_generate_key(&self.namespace, table_name, &key)?;
462 let value = serde_json::to_string(&row).map_err(|e| {
463 Error::StorageMsg(format!(
464 "[RedisStorage] failed to serialize row={row:?} error={e}"
465 ))
466 })?;
467
468 self.redis_execute_set(&redis_key, &value)?;
469 }
470
471 Ok(())
472 }
473
474 async fn insert_data(&mut self, table_name: &str, rows: Vec<(Key, DataRow)>) -> Result<()> {
475 for (key, row) in rows {
476 let redis_key = Self::redis_generate_key(&self.namespace, table_name, &key)?;
477 let value = serde_json::to_string(&row).map_err(|e| {
478 Error::StorageMsg(format!(
479 "[RedisStorage] failed to serialize row={row:?} error={e}"
480 ))
481 })?;
482 self.redis_execute_set(&redis_key, &value)?;
483 }
484
485 Ok(())
486 }
487
488 async fn delete_data(&mut self, table_name: &str, keys: Vec<Key>) -> Result<()> {
489 for key in keys {
490 let redis_key = Self::redis_generate_key(&self.namespace, table_name, &key)?;
491 self.redis_execute_del(&redis_key)?;
492 }
493
494 Ok(())
495 }
496}