1use async_trait::async_trait;
2use redis::AsyncCommands;
3use serde_json::Value;
4use synaptic_core::{Item, SynapticError};
5
6#[derive(Debug, Clone)]
8pub struct RedisStoreConfig {
9 pub prefix: String,
11}
12
13impl Default for RedisStoreConfig {
14 fn default() -> Self {
15 Self {
16 prefix: "synaptic:store:".to_string(),
17 }
18 }
19}
20
21pub struct RedisStore {
27 client: redis::Client,
28 config: RedisStoreConfig,
29}
30
31impl RedisStore {
32 pub fn new(client: redis::Client, config: RedisStoreConfig) -> Self {
34 Self { client, config }
35 }
36
37 pub fn from_url(url: &str) -> Result<Self, SynapticError> {
43 let client = redis::Client::open(url)
44 .map_err(|e| SynapticError::Store(format!("failed to connect to Redis: {e}")))?;
45 Ok(Self {
46 client,
47 config: RedisStoreConfig::default(),
48 })
49 }
50
51 pub fn from_url_with_config(
53 url: &str,
54 config: RedisStoreConfig,
55 ) -> Result<Self, SynapticError> {
56 let client = redis::Client::open(url)
57 .map_err(|e| SynapticError::Store(format!("failed to connect to Redis: {e}")))?;
58 Ok(Self { client, config })
59 }
60
61 fn redis_key(&self, namespace: &[&str], key: &str) -> String {
63 let ns = namespace.join(":");
64 if ns.is_empty() {
65 format!("{}:{}", self.config.prefix.trim_end_matches(':'), key)
66 } else {
67 format!("{}{ns}:{key}", self.config.prefix)
68 }
69 }
70
71 fn namespace_index_key(&self) -> String {
73 format!("{}__namespaces__", self.config.prefix)
74 }
75
76 fn scan_pattern(&self, namespace: &[&str]) -> String {
78 let ns = namespace.join(":");
79 if ns.is_empty() {
80 format!("{}*", self.config.prefix)
81 } else {
82 format!("{}{ns}:*", self.config.prefix)
83 }
84 }
85
86 fn encode_namespace(namespace: &[&str]) -> String {
88 namespace.join(":")
89 }
90
91 async fn get_connection(&self) -> Result<redis::aio::MultiplexedConnection, SynapticError> {
92 self.client
93 .get_multiplexed_async_connection()
94 .await
95 .map_err(|e| SynapticError::Store(format!("Redis connection error: {e}")))
96 }
97}
98
99fn now_iso() -> String {
100 format!("{:?}", std::time::SystemTime::now())
101}
102
103async fn redis_get_string(
105 con: &mut redis::aio::MultiplexedConnection,
106 key: &str,
107) -> Result<Option<String>, SynapticError> {
108 let raw: Option<String> = con
109 .get(key)
110 .await
111 .map_err(|e| SynapticError::Store(format!("Redis GET error: {e}")))?;
112 Ok(raw)
113}
114
115#[async_trait]
116impl synaptic_core::Store for RedisStore {
117 async fn get(&self, namespace: &[&str], key: &str) -> Result<Option<Item>, SynapticError> {
118 let mut con = self.get_connection().await?;
119 let redis_key = self.redis_key(namespace, key);
120
121 let raw = redis_get_string(&mut con, &redis_key).await?;
122
123 match raw {
124 Some(json_str) => {
125 let item: Item = serde_json::from_str(&json_str)
126 .map_err(|e| SynapticError::Store(format!("JSON deserialize error: {e}")))?;
127 Ok(Some(item))
128 }
129 None => Ok(None),
130 }
131 }
132
133 async fn search(
134 &self,
135 namespace: &[&str],
136 query: Option<&str>,
137 limit: usize,
138 ) -> Result<Vec<Item>, SynapticError> {
139 let mut con = self.get_connection().await?;
140 let pattern = self.scan_pattern(namespace);
141 let ns_index_key = self.namespace_index_key();
142
143 let mut keys: Vec<String> = Vec::new();
145 let mut cursor: u64 = 0;
146 loop {
147 let (next_cursor, batch): (u64, Vec<String>) = redis::cmd("SCAN")
148 .arg(cursor)
149 .arg("MATCH")
150 .arg(&pattern)
151 .arg("COUNT")
152 .arg(100)
153 .query_async(&mut con)
154 .await
155 .map_err(|e| SynapticError::Store(format!("Redis SCAN error: {e}")))?;
156
157 for k in batch {
159 if k != ns_index_key {
160 keys.push(k);
161 }
162 }
163 cursor = next_cursor;
164 if cursor == 0 {
165 break;
166 }
167 }
168
169 let mut items: Vec<Item> = Vec::new();
171 for k in &keys {
172 let raw = redis_get_string(&mut con, k).await?;
173 if let Some(json_str) = raw {
174 if let Ok(item) = serde_json::from_str::<Item>(&json_str) {
175 if let Some(q) = query {
177 if item.key.contains(q) || item.value.to_string().contains(q) {
178 items.push(item);
179 }
180 } else {
181 items.push(item);
182 }
183 }
184 }
185 if items.len() >= limit {
186 break;
187 }
188 }
189
190 items.truncate(limit);
191 Ok(items)
192 }
193
194 async fn put(&self, namespace: &[&str], key: &str, value: Value) -> Result<(), SynapticError> {
195 let mut con = self.get_connection().await?;
196 let redis_key = self.redis_key(namespace, key);
197 let ns_index_key = self.namespace_index_key();
198 let ns_encoded = Self::encode_namespace(namespace);
199
200 let existing = redis_get_string(&mut con, &redis_key).await?;
202
203 let now = now_iso();
204 let created_at = existing
205 .as_ref()
206 .and_then(|json_str| serde_json::from_str::<Item>(json_str).ok())
207 .map(|item| item.created_at)
208 .unwrap_or_else(|| now.clone());
209
210 let item = Item {
211 namespace: namespace.iter().map(|s| s.to_string()).collect(),
212 key: key.to_string(),
213 value,
214 created_at,
215 updated_at: now,
216 score: None,
217 };
218
219 let json_str = serde_json::to_string(&item)
220 .map_err(|e| SynapticError::Store(format!("JSON serialize error: {e}")))?;
221
222 con.set::<_, _, ()>(&redis_key, &json_str)
223 .await
224 .map_err(|e| SynapticError::Store(format!("Redis SET error: {e}")))?;
225
226 con.sadd::<_, _, ()>(&ns_index_key, &ns_encoded)
228 .await
229 .map_err(|e| SynapticError::Store(format!("Redis SADD error: {e}")))?;
230
231 Ok(())
232 }
233
234 async fn delete(&self, namespace: &[&str], key: &str) -> Result<(), SynapticError> {
235 let mut con = self.get_connection().await?;
236 let redis_key = self.redis_key(namespace, key);
237
238 con.del::<_, ()>(&redis_key)
239 .await
240 .map_err(|e| SynapticError::Store(format!("Redis DEL error: {e}")))?;
241
242 Ok(())
243 }
244
245 async fn list_namespaces(&self, prefix: &[&str]) -> Result<Vec<Vec<String>>, SynapticError> {
246 let mut con = self.get_connection().await?;
247 let ns_index_key = self.namespace_index_key();
248
249 let members: Vec<String> = con
250 .smembers(&ns_index_key)
251 .await
252 .map_err(|e| SynapticError::Store(format!("Redis SMEMBERS error: {e}")))?;
253
254 let prefix_str = if prefix.is_empty() {
255 String::new()
256 } else {
257 prefix.join(":")
258 };
259
260 let namespaces: Vec<Vec<String>> = members
261 .into_iter()
262 .filter(|ns| prefix.is_empty() || ns.starts_with(&prefix_str))
263 .map(|ns| ns.split(':').map(String::from).collect())
264 .collect();
265
266 Ok(namespaces)
267 }
268}