bubble_db/
redis.rs

1use crate::{DatabaseConfig, DatabaseConnection, DbResult};
2use async_trait::async_trait;
3use redis::{Client, Commands};
4use std::collections::HashMap;
5
6#[derive(Debug)]
7pub struct RedisConnection {
8    client: Client,
9}
10
11impl RedisConnection {
12    pub async fn connect(config: &DatabaseConfig) -> DbResult<Self> {
13        let client = Client::open(config.connection_string()).map_err(|e| e.to_string())?;
14        Ok(Self { client })
15    }
16
17    fn get_connection(&self) -> DbResult<redis::Connection> {
18        self.client.get_connection().map_err(|e| e.to_string())
19    }
20}
21
22#[async_trait]
23impl DatabaseConnection for RedisConnection {
24    async fn execute(&self, sql: &str) -> DbResult<u64> {
25        let mut conn = self.get_connection()?;
26        let parts: Vec<&str> = sql.split_whitespace().collect();
27        if parts.is_empty() {
28            return Ok(0);
29        }
30        match parts[0].to_uppercase().as_str() {
31            "SET" if parts.len() >= 3 => {
32                let key = parts[1];
33                let value = parts[2..].join(" ");
34                let _: () = redis::cmd("SET")
35                    .arg(key)
36                    .arg(value)
37                    .query(&mut conn)
38                    .map_err(|e| e.to_string())?;
39                Ok(1)
40            }
41            "DEL" if parts.len() >= 2 => {
42                let keys = &parts[1..];
43                let count: u64 = redis::cmd("DEL")
44                    .arg(keys)
45                    .query(&mut conn)
46                    .map_err(|e| e.to_string())?;
47                Ok(count)
48            }
49            "HSET" if parts.len() >= 4 => {
50                let key = parts[1];
51                let field = parts[2];
52                let value = parts[3..].join(" ");
53                let _: () = redis::cmd("HSET")
54                    .arg(key)
55                    .arg(field)
56                    .arg(value)
57                    .query(&mut conn)
58                    .map_err(|e| e.to_string())?;
59                Ok(1)
60            }
61            _ => Err("Unsupported Redis command".to_string()),
62        }
63    }
64
65    async fn query(&self, sql: &str) -> DbResult<String> {
66        let mut conn = self.get_connection()?;
67        let parts: Vec<&str> = sql.split_whitespace().collect();
68        match parts[0].to_uppercase().as_str() {
69            "GET" if parts.len() == 2 => {
70                let value: Option<String> = conn.get(parts[1]).map_err(|e| e.to_string())?;
71
72                let result = if let Some(val) = value {
73                    serde_json::json!({ "value": val })
74                } else {
75                    serde_json::json!([])
76                };
77
78                serde_json::to_string(&result).map_err(|e| e.to_string())
79            }
80            "HGETALL" if parts.len() == 2 => {
81                let map: HashMap<String, String> =
82                    conn.hgetall(parts[1]).map_err(|e| e.to_string())?;
83
84                serde_json::to_string(&map).map_err(|e| e.to_string())
85            }
86            _ => Err("Unsupported Redis query".to_string()),
87        }
88    }
89
90    async fn query_one(&self, sql: &str) -> DbResult<String> {
91        let mut conn = self.get_connection()?;
92        let parts: Vec<&str> = sql.split_whitespace().collect();
93        match parts[0].to_uppercase().as_str() {
94            "GET" if parts.len() == 2 => {
95                let value: Option<String> = conn.get(parts[1]).map_err(|e| e.to_string())?;
96
97                if let Some(val) = value {
98                    serde_json::to_string(&serde_json::json!({ "value": val }))
99                        .map_err(|e| e.to_string())
100                } else {
101                    Err("No data found".to_string())
102                }
103            }
104            "HGETALL" if parts.len() == 2 => {
105                let map: HashMap<String, String> =
106                    conn.hgetall(parts[1]).map_err(|e| e.to_string())?;
107
108                serde_json::to_string(&map).map_err(|e| e.to_string())
109            }
110            _ => Err("Unsupported Redis query".to_string()),
111        }
112    }
113
114    async fn insert_batch(&self, table: &str, json_data: &str) -> DbResult<u64> {
115        let items: Vec<serde_json::Value> = serde_json::from_str(json_data)
116            .map_err(|e| format!("Failed to parse JSON data: {}", e))?;
117        let mut conn = self.get_connection()?;
118        let mut count = 0;
119        for (i, item) in items.iter().enumerate() {
120            let value = crate::to_sql_value(item)?;
121            let key = format!("{}:{}", table, i);
122            let _: () = conn.set(&key, value).map_err(|e| e.to_string())?;
123            count += 1;
124        }
125        Ok(count)
126    }
127}