1use async_trait::async_trait;
2use redis::AsyncCommands;
3use serde_json::Value;
4use synaptic_core::{Item, SynapticError};
5
6#[derive(Debug, Clone)]
8pub struct RedisStoreConfig {
9 pub prefix: String,
11}
12
13impl Default for RedisStoreConfig {
14 fn default() -> Self {
15 Self {
16 prefix: "synaptic:store:".to_string(),
17 }
18 }
19}
20
21pub struct RedisStore {
27 client: redis::Client,
28 config: RedisStoreConfig,
29}
30
31impl RedisStore {
32 pub fn new(client: redis::Client, config: RedisStoreConfig) -> Self {
34 Self { client, config }
35 }
36
37 pub fn from_url(url: &str) -> Result<Self, SynapticError> {
43 let client = redis::Client::open(url)
44 .map_err(|e| SynapticError::Store(format!("failed to connect to Redis: {e}")))?;
45 Ok(Self {
46 client,
47 config: RedisStoreConfig::default(),
48 })
49 }
50
51 pub fn from_url_with_config(
53 url: &str,
54 config: RedisStoreConfig,
55 ) -> Result<Self, SynapticError> {
56 let client = redis::Client::open(url)
57 .map_err(|e| SynapticError::Store(format!("failed to connect to Redis: {e}")))?;
58 Ok(Self { client, config })
59 }
60
61 fn redis_key(&self, namespace: &[&str], key: &str) -> String {
63 let ns = namespace.join(":");
64 if ns.is_empty() {
65 format!("{}:{}", self.config.prefix.trim_end_matches(':'), key)
66 } else {
67 format!("{}{ns}:{key}", self.config.prefix)
68 }
69 }
70
71 fn namespace_index_key(&self) -> String {
73 format!("{}__namespaces__", self.config.prefix)
74 }
75
76 fn scan_pattern(&self, namespace: &[&str]) -> String {
78 let ns = namespace.join(":");
79 if ns.is_empty() {
80 format!("{}*", self.config.prefix)
81 } else {
82 format!("{}{ns}:*", self.config.prefix)
83 }
84 }
85
86 fn encode_namespace(namespace: &[&str]) -> String {
88 namespace.join(":")
89 }
90
91 async fn get_connection(
92 &self,
93 ) -> Result<redis::aio::MultiplexedConnection, SynapticError> {
94 self.client
95 .get_multiplexed_async_connection()
96 .await
97 .map_err(|e| SynapticError::Store(format!("Redis connection error: {e}")))
98 }
99}
100
101fn now_iso() -> String {
102 format!("{:?}", std::time::SystemTime::now())
103}
104
105async fn redis_get_string(
107 con: &mut redis::aio::MultiplexedConnection,
108 key: &str,
109) -> Result<Option<String>, SynapticError> {
110 let raw: Option<String> = con
111 .get(key)
112 .await
113 .map_err(|e| SynapticError::Store(format!("Redis GET error: {e}")))?;
114 Ok(raw)
115}
116
117#[async_trait]
118impl synaptic_core::Store for RedisStore {
119 async fn get(&self, namespace: &[&str], key: &str) -> Result<Option<Item>, SynapticError> {
120 let mut con = self.get_connection().await?;
121 let redis_key = self.redis_key(namespace, key);
122
123 let raw = redis_get_string(&mut con, &redis_key).await?;
124
125 match raw {
126 Some(json_str) => {
127 let item: Item = serde_json::from_str(&json_str)
128 .map_err(|e| SynapticError::Store(format!("JSON deserialize error: {e}")))?;
129 Ok(Some(item))
130 }
131 None => Ok(None),
132 }
133 }
134
135 async fn search(
136 &self,
137 namespace: &[&str],
138 query: Option<&str>,
139 limit: usize,
140 ) -> Result<Vec<Item>, SynapticError> {
141 let mut con = self.get_connection().await?;
142 let pattern = self.scan_pattern(namespace);
143 let ns_index_key = self.namespace_index_key();
144
145 let mut keys: Vec<String> = Vec::new();
147 let mut cursor: u64 = 0;
148 loop {
149 let (next_cursor, batch): (u64, Vec<String>) = redis::cmd("SCAN")
150 .arg(cursor)
151 .arg("MATCH")
152 .arg(&pattern)
153 .arg("COUNT")
154 .arg(100)
155 .query_async(&mut con)
156 .await
157 .map_err(|e| SynapticError::Store(format!("Redis SCAN error: {e}")))?;
158
159 for k in batch {
161 if k != ns_index_key {
162 keys.push(k);
163 }
164 }
165 cursor = next_cursor;
166 if cursor == 0 {
167 break;
168 }
169 }
170
171 let mut items: Vec<Item> = Vec::new();
173 for k in &keys {
174 let raw = redis_get_string(&mut con, k).await?;
175 if let Some(json_str) = raw {
176 if let Ok(item) = serde_json::from_str::<Item>(&json_str) {
177 if let Some(q) = query {
179 if item.key.contains(q) || item.value.to_string().contains(q) {
180 items.push(item);
181 }
182 } else {
183 items.push(item);
184 }
185 }
186 }
187 if items.len() >= limit {
188 break;
189 }
190 }
191
192 items.truncate(limit);
193 Ok(items)
194 }
195
196 async fn put(&self, namespace: &[&str], key: &str, value: Value) -> Result<(), SynapticError> {
197 let mut con = self.get_connection().await?;
198 let redis_key = self.redis_key(namespace, key);
199 let ns_index_key = self.namespace_index_key();
200 let ns_encoded = Self::encode_namespace(namespace);
201
202 let existing = redis_get_string(&mut con, &redis_key).await?;
204
205 let now = now_iso();
206 let created_at = existing
207 .as_ref()
208 .and_then(|json_str| serde_json::from_str::<Item>(json_str).ok())
209 .map(|item| item.created_at)
210 .unwrap_or_else(|| now.clone());
211
212 let item = Item {
213 namespace: namespace.iter().map(|s| s.to_string()).collect(),
214 key: key.to_string(),
215 value,
216 created_at,
217 updated_at: now,
218 score: None,
219 };
220
221 let json_str = serde_json::to_string(&item)
222 .map_err(|e| SynapticError::Store(format!("JSON serialize error: {e}")))?;
223
224 con.set::<_, _, ()>(&redis_key, &json_str)
225 .await
226 .map_err(|e| SynapticError::Store(format!("Redis SET error: {e}")))?;
227
228 con.sadd::<_, _, ()>(&ns_index_key, &ns_encoded)
230 .await
231 .map_err(|e| SynapticError::Store(format!("Redis SADD error: {e}")))?;
232
233 Ok(())
234 }
235
236 async fn delete(&self, namespace: &[&str], key: &str) -> Result<(), SynapticError> {
237 let mut con = self.get_connection().await?;
238 let redis_key = self.redis_key(namespace, key);
239
240 con.del::<_, ()>(&redis_key)
241 .await
242 .map_err(|e| SynapticError::Store(format!("Redis DEL error: {e}")))?;
243
244 Ok(())
245 }
246
247 async fn list_namespaces(&self, prefix: &[&str]) -> Result<Vec<Vec<String>>, SynapticError> {
248 let mut con = self.get_connection().await?;
249 let ns_index_key = self.namespace_index_key();
250
251 let members: Vec<String> = con
252 .smembers(&ns_index_key)
253 .await
254 .map_err(|e| SynapticError::Store(format!("Redis SMEMBERS error: {e}")))?;
255
256 let prefix_str = if prefix.is_empty() {
257 String::new()
258 } else {
259 prefix.join(":")
260 };
261
262 let namespaces: Vec<Vec<String>> = members
263 .into_iter()
264 .filter(|ns| prefix.is_empty() || ns.starts_with(&prefix_str))
265 .map(|ns| ns.split(':').map(String::from).collect())
266 .collect();
267
268 Ok(namespaces)
269 }
270}