1use async_trait::async_trait;
2use redis::AsyncCommands;
3use serde_json::Value;
4use synaptic_core::{encode_namespace, now_iso, Item, SynapticError};
5
6use crate::connection::{collect_matching_keys, RedisBackend, RedisConn};
7
8#[derive(Debug, Clone)]
10pub struct RedisStoreConfig {
11 pub prefix: String,
13}
14
15impl Default for RedisStoreConfig {
16 fn default() -> Self {
17 Self {
18 prefix: "synaptic:store:".to_string(),
19 }
20 }
21}
22
23pub struct RedisStore {
31 backend: RedisBackend,
32 config: RedisStoreConfig,
33}
34
35impl RedisStore {
36 pub fn from_url(url: &str) -> Result<Self, SynapticError> {
42 Ok(Self {
43 backend: RedisBackend::standalone(url)?,
44 config: RedisStoreConfig::default(),
45 })
46 }
47
48 pub fn from_url_with_config(
50 url: &str,
51 config: RedisStoreConfig,
52 ) -> Result<Self, SynapticError> {
53 Ok(Self {
54 backend: RedisBackend::standalone(url)?,
55 config,
56 })
57 }
58
59 #[allow(dead_code)]
61 pub(crate) fn from_backend(backend: RedisBackend, config: RedisStoreConfig) -> Self {
62 Self { backend, config }
63 }
64
65 #[cfg(feature = "cluster")]
67 pub fn from_cluster_nodes(nodes: &[&str]) -> Result<Self, SynapticError> {
68 Ok(Self {
69 backend: RedisBackend::cluster(nodes)?,
70 config: RedisStoreConfig::default(),
71 })
72 }
73
74 #[cfg(feature = "cluster")]
76 pub fn from_cluster_nodes_with_config(
77 nodes: &[&str],
78 config: RedisStoreConfig,
79 ) -> Result<Self, SynapticError> {
80 Ok(Self {
81 backend: RedisBackend::cluster(nodes)?,
82 config,
83 })
84 }
85
86 fn redis_key(&self, namespace: &[&str], key: &str) -> String {
88 let ns = namespace.join(":");
89 if ns.is_empty() {
90 format!("{}:{}", self.config.prefix.trim_end_matches(':'), key)
91 } else {
92 format!("{}{ns}:{key}", self.config.prefix)
93 }
94 }
95
96 fn namespace_index_key(&self) -> String {
98 format!("{}__namespaces__", self.config.prefix)
99 }
100
101 fn scan_pattern(&self, namespace: &[&str]) -> String {
103 let ns = namespace.join(":");
104 if ns.is_empty() {
105 format!("{}*", self.config.prefix)
106 } else {
107 format!("{}{ns}:*", self.config.prefix)
108 }
109 }
110
111 async fn get_connection(&self) -> Result<RedisConn, SynapticError> {
112 self.backend.get_connection().await
113 }
114}
115
116async fn redis_get_string(con: &mut RedisConn, key: &str) -> Result<Option<String>, SynapticError> {
118 let raw: Option<String> = con
119 .get(key)
120 .await
121 .map_err(|e| SynapticError::Store(format!("Redis GET error: {e}")))?;
122 Ok(raw)
123}
124
125#[async_trait]
126impl synaptic_core::Store for RedisStore {
127 async fn get(&self, namespace: &[&str], key: &str) -> Result<Option<Item>, SynapticError> {
128 let mut con = self.get_connection().await?;
129 let redis_key = self.redis_key(namespace, key);
130
131 let raw = redis_get_string(&mut con, &redis_key).await?;
132
133 match raw {
134 Some(json_str) => {
135 let item: Item = serde_json::from_str(&json_str)
136 .map_err(|e| SynapticError::Store(format!("JSON deserialize error: {e}")))?;
137 Ok(Some(item))
138 }
139 None => Ok(None),
140 }
141 }
142
143 async fn search(
144 &self,
145 namespace: &[&str],
146 query: Option<&str>,
147 limit: usize,
148 ) -> Result<Vec<Item>, SynapticError> {
149 let mut con = self.get_connection().await?;
150 let pattern = self.scan_pattern(namespace);
151 let ns_index_key = self.namespace_index_key();
152
153 let all_keys = collect_matching_keys(&mut con, &pattern).await?;
155
156 let keys: Vec<String> = all_keys
158 .into_iter()
159 .filter(|k| k != &ns_index_key)
160 .collect();
161
162 let mut items: Vec<Item> = Vec::new();
164 for k in &keys {
165 let raw = redis_get_string(&mut con, k).await?;
166 if let Some(json_str) = raw {
167 if let Ok(item) = serde_json::from_str::<Item>(&json_str) {
168 if let Some(q) = query {
170 if item.key.contains(q) || item.value.to_string().contains(q) {
171 items.push(item);
172 }
173 } else {
174 items.push(item);
175 }
176 }
177 }
178 if items.len() >= limit {
179 break;
180 }
181 }
182
183 items.truncate(limit);
184 Ok(items)
185 }
186
187 async fn put(&self, namespace: &[&str], key: &str, value: Value) -> Result<(), SynapticError> {
188 let mut con = self.get_connection().await?;
189 let redis_key = self.redis_key(namespace, key);
190 let ns_index_key = self.namespace_index_key();
191 let ns_encoded = encode_namespace(namespace);
192
193 let existing = redis_get_string(&mut con, &redis_key).await?;
195
196 let now = now_iso();
197 let created_at = existing
198 .as_ref()
199 .and_then(|json_str| serde_json::from_str::<Item>(json_str).ok())
200 .map(|item| item.created_at)
201 .unwrap_or_else(|| now.clone());
202
203 let item = Item {
204 namespace: namespace.iter().map(|s| s.to_string()).collect(),
205 key: key.to_string(),
206 value,
207 created_at,
208 updated_at: now,
209 score: None,
210 };
211
212 let json_str = serde_json::to_string(&item)
213 .map_err(|e| SynapticError::Store(format!("JSON serialize error: {e}")))?;
214
215 con.set::<_, _, ()>(&redis_key, &json_str)
216 .await
217 .map_err(|e| SynapticError::Store(format!("Redis SET error: {e}")))?;
218
219 con.sadd::<_, _, ()>(&ns_index_key, &ns_encoded)
221 .await
222 .map_err(|e| SynapticError::Store(format!("Redis SADD error: {e}")))?;
223
224 Ok(())
225 }
226
227 async fn delete(&self, namespace: &[&str], key: &str) -> Result<(), SynapticError> {
228 let mut con = self.get_connection().await?;
229 let redis_key = self.redis_key(namespace, key);
230
231 con.del::<_, ()>(&redis_key)
232 .await
233 .map_err(|e| SynapticError::Store(format!("Redis DEL error: {e}")))?;
234
235 Ok(())
236 }
237
238 async fn list_namespaces(&self, prefix: &[&str]) -> Result<Vec<Vec<String>>, SynapticError> {
239 let mut con = self.get_connection().await?;
240 let ns_index_key = self.namespace_index_key();
241
242 let members: Vec<String> = con
243 .smembers(&ns_index_key)
244 .await
245 .map_err(|e| SynapticError::Store(format!("Redis SMEMBERS error: {e}")))?;
246
247 let prefix_str = if prefix.is_empty() {
248 String::new()
249 } else {
250 prefix.join(":")
251 };
252
253 let namespaces: Vec<Vec<String>> = members
254 .into_iter()
255 .filter(|ns| prefix.is_empty() || ns.starts_with(&prefix_str))
256 .map(|ns| ns.split(':').map(String::from).collect())
257 .collect();
258
259 Ok(namespaces)
260 }
261}