1use async_trait::async_trait;
2use redis::AsyncCommands;
3use serde_json::Value;
4use synaptic_core::{Item, SynapticError};
5
6use crate::connection::{collect_matching_keys, RedisBackend, RedisConn};
7
8#[derive(Debug, Clone)]
10pub struct RedisStoreConfig {
11 pub prefix: String,
13}
14
15impl Default for RedisStoreConfig {
16 fn default() -> Self {
17 Self {
18 prefix: "synaptic:store:".to_string(),
19 }
20 }
21}
22
23pub struct RedisStore {
31 backend: RedisBackend,
32 config: RedisStoreConfig,
33}
34
35impl RedisStore {
36 pub fn from_url(url: &str) -> Result<Self, SynapticError> {
42 Ok(Self {
43 backend: RedisBackend::standalone(url)?,
44 config: RedisStoreConfig::default(),
45 })
46 }
47
48 pub fn from_url_with_config(
50 url: &str,
51 config: RedisStoreConfig,
52 ) -> Result<Self, SynapticError> {
53 Ok(Self {
54 backend: RedisBackend::standalone(url)?,
55 config,
56 })
57 }
58
59 #[allow(dead_code)]
61 pub(crate) fn from_backend(backend: RedisBackend, config: RedisStoreConfig) -> Self {
62 Self { backend, config }
63 }
64
65 #[cfg(feature = "cluster")]
67 pub fn from_cluster_nodes(nodes: &[&str]) -> Result<Self, SynapticError> {
68 Ok(Self {
69 backend: RedisBackend::cluster(nodes)?,
70 config: RedisStoreConfig::default(),
71 })
72 }
73
74 #[cfg(feature = "cluster")]
76 pub fn from_cluster_nodes_with_config(
77 nodes: &[&str],
78 config: RedisStoreConfig,
79 ) -> Result<Self, SynapticError> {
80 Ok(Self {
81 backend: RedisBackend::cluster(nodes)?,
82 config,
83 })
84 }
85
86 fn redis_key(&self, namespace: &[&str], key: &str) -> String {
88 let ns = namespace.join(":");
89 if ns.is_empty() {
90 format!("{}:{}", self.config.prefix.trim_end_matches(':'), key)
91 } else {
92 format!("{}{ns}:{key}", self.config.prefix)
93 }
94 }
95
96 fn namespace_index_key(&self) -> String {
98 format!("{}__namespaces__", self.config.prefix)
99 }
100
101 fn scan_pattern(&self, namespace: &[&str]) -> String {
103 let ns = namespace.join(":");
104 if ns.is_empty() {
105 format!("{}*", self.config.prefix)
106 } else {
107 format!("{}{ns}:*", self.config.prefix)
108 }
109 }
110
111 fn encode_namespace(namespace: &[&str]) -> String {
113 namespace.join(":")
114 }
115
116 async fn get_connection(&self) -> Result<RedisConn, SynapticError> {
117 self.backend.get_connection().await
118 }
119}
120
121fn now_iso() -> String {
122 format!("{:?}", std::time::SystemTime::now())
123}
124
125async fn redis_get_string(con: &mut RedisConn, key: &str) -> Result<Option<String>, SynapticError> {
127 let raw: Option<String> = con
128 .get(key)
129 .await
130 .map_err(|e| SynapticError::Store(format!("Redis GET error: {e}")))?;
131 Ok(raw)
132}
133
134#[async_trait]
135impl synaptic_core::Store for RedisStore {
136 async fn get(&self, namespace: &[&str], key: &str) -> Result<Option<Item>, SynapticError> {
137 let mut con = self.get_connection().await?;
138 let redis_key = self.redis_key(namespace, key);
139
140 let raw = redis_get_string(&mut con, &redis_key).await?;
141
142 match raw {
143 Some(json_str) => {
144 let item: Item = serde_json::from_str(&json_str)
145 .map_err(|e| SynapticError::Store(format!("JSON deserialize error: {e}")))?;
146 Ok(Some(item))
147 }
148 None => Ok(None),
149 }
150 }
151
152 async fn search(
153 &self,
154 namespace: &[&str],
155 query: Option<&str>,
156 limit: usize,
157 ) -> Result<Vec<Item>, SynapticError> {
158 let mut con = self.get_connection().await?;
159 let pattern = self.scan_pattern(namespace);
160 let ns_index_key = self.namespace_index_key();
161
162 let all_keys = collect_matching_keys(&mut con, &pattern).await?;
164
165 let keys: Vec<String> = all_keys
167 .into_iter()
168 .filter(|k| k != &ns_index_key)
169 .collect();
170
171 let mut items: Vec<Item> = Vec::new();
173 for k in &keys {
174 let raw = redis_get_string(&mut con, k).await?;
175 if let Some(json_str) = raw {
176 if let Ok(item) = serde_json::from_str::<Item>(&json_str) {
177 if let Some(q) = query {
179 if item.key.contains(q) || item.value.to_string().contains(q) {
180 items.push(item);
181 }
182 } else {
183 items.push(item);
184 }
185 }
186 }
187 if items.len() >= limit {
188 break;
189 }
190 }
191
192 items.truncate(limit);
193 Ok(items)
194 }
195
196 async fn put(&self, namespace: &[&str], key: &str, value: Value) -> Result<(), SynapticError> {
197 let mut con = self.get_connection().await?;
198 let redis_key = self.redis_key(namespace, key);
199 let ns_index_key = self.namespace_index_key();
200 let ns_encoded = Self::encode_namespace(namespace);
201
202 let existing = redis_get_string(&mut con, &redis_key).await?;
204
205 let now = now_iso();
206 let created_at = existing
207 .as_ref()
208 .and_then(|json_str| serde_json::from_str::<Item>(json_str).ok())
209 .map(|item| item.created_at)
210 .unwrap_or_else(|| now.clone());
211
212 let item = Item {
213 namespace: namespace.iter().map(|s| s.to_string()).collect(),
214 key: key.to_string(),
215 value,
216 created_at,
217 updated_at: now,
218 score: None,
219 };
220
221 let json_str = serde_json::to_string(&item)
222 .map_err(|e| SynapticError::Store(format!("JSON serialize error: {e}")))?;
223
224 con.set::<_, _, ()>(&redis_key, &json_str)
225 .await
226 .map_err(|e| SynapticError::Store(format!("Redis SET error: {e}")))?;
227
228 con.sadd::<_, _, ()>(&ns_index_key, &ns_encoded)
230 .await
231 .map_err(|e| SynapticError::Store(format!("Redis SADD error: {e}")))?;
232
233 Ok(())
234 }
235
236 async fn delete(&self, namespace: &[&str], key: &str) -> Result<(), SynapticError> {
237 let mut con = self.get_connection().await?;
238 let redis_key = self.redis_key(namespace, key);
239
240 con.del::<_, ()>(&redis_key)
241 .await
242 .map_err(|e| SynapticError::Store(format!("Redis DEL error: {e}")))?;
243
244 Ok(())
245 }
246
247 async fn list_namespaces(&self, prefix: &[&str]) -> Result<Vec<Vec<String>>, SynapticError> {
248 let mut con = self.get_connection().await?;
249 let ns_index_key = self.namespace_index_key();
250
251 let members: Vec<String> = con
252 .smembers(&ns_index_key)
253 .await
254 .map_err(|e| SynapticError::Store(format!("Redis SMEMBERS error: {e}")))?;
255
256 let prefix_str = if prefix.is_empty() {
257 String::new()
258 } else {
259 prefix.join(":")
260 };
261
262 let namespaces: Vec<Vec<String>> = members
263 .into_iter()
264 .filter(|ns| prefix.is_empty() || ns.starts_with(&prefix_str))
265 .map(|ns| ns.split(':').map(String::from).collect())
266 .collect();
267
268 Ok(namespaces)
269 }
270}