Skip to main content

synaptic_redis/
store.rs

1use async_trait::async_trait;
2use redis::AsyncCommands;
3use serde_json::Value;
4use synaptic_core::{encode_namespace, now_iso, Item, SynapticError};
5
6use crate::connection::{collect_matching_keys, RedisBackend, RedisConn};
7
8/// Configuration for [`RedisStore`].
9#[derive(Debug, Clone)]
10pub struct RedisStoreConfig {
11    /// Key prefix for all store entries. Defaults to `"synaptic:store:"`.
12    pub prefix: String,
13}
14
15impl Default for RedisStoreConfig {
16    fn default() -> Self {
17        Self {
18            prefix: "synaptic:store:".to_string(),
19        }
20    }
21}
22
23/// Redis-backed implementation of the [`Store`](synaptic_core::Store) trait.
24///
25/// Keys are stored in the format `{prefix}{namespace_joined_by_colon}:{key}`.
26/// A Redis SET at `{prefix}__namespaces__` tracks all known namespace paths
27/// for efficient [`list_namespaces`](synaptic_core::Store::list_namespaces) queries.
28///
29/// Supports both standalone Redis and Redis Cluster (with the `cluster` feature).
30pub struct RedisStore {
31    backend: RedisBackend,
32    config: RedisStoreConfig,
33}
34
35impl RedisStore {
36    /// Create a new `RedisStore` from a Redis URL with default configuration.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if the URL is invalid.
41    pub fn from_url(url: &str) -> Result<Self, SynapticError> {
42        Ok(Self {
43            backend: RedisBackend::standalone(url)?,
44            config: RedisStoreConfig::default(),
45        })
46    }
47
48    /// Create a new `RedisStore` from a Redis URL with custom configuration.
49    pub fn from_url_with_config(
50        url: &str,
51        config: RedisStoreConfig,
52    ) -> Result<Self, SynapticError> {
53        Ok(Self {
54            backend: RedisBackend::standalone(url)?,
55            config,
56        })
57    }
58
59    /// Create a new `RedisStore` from an existing [`RedisBackend`].
60    #[allow(dead_code)]
61    pub(crate) fn from_backend(backend: RedisBackend, config: RedisStoreConfig) -> Self {
62        Self { backend, config }
63    }
64
65    /// Create a new `RedisStore` connecting to a Redis Cluster.
66    #[cfg(feature = "cluster")]
67    pub fn from_cluster_nodes(nodes: &[&str]) -> Result<Self, SynapticError> {
68        Ok(Self {
69            backend: RedisBackend::cluster(nodes)?,
70            config: RedisStoreConfig::default(),
71        })
72    }
73
74    /// Create a new `RedisStore` connecting to a Redis Cluster with custom configuration.
75    #[cfg(feature = "cluster")]
76    pub fn from_cluster_nodes_with_config(
77        nodes: &[&str],
78        config: RedisStoreConfig,
79    ) -> Result<Self, SynapticError> {
80        Ok(Self {
81            backend: RedisBackend::cluster(nodes)?,
82            config,
83        })
84    }
85
86    /// Build the Redis key for a given namespace and item key.
87    fn redis_key(&self, namespace: &[&str], key: &str) -> String {
88        let ns = namespace.join(":");
89        if ns.is_empty() {
90            format!("{}:{}", self.config.prefix.trim_end_matches(':'), key)
91        } else {
92            format!("{}{ns}:{key}", self.config.prefix)
93        }
94    }
95
96    /// Build the Redis key for the namespace index SET.
97    fn namespace_index_key(&self) -> String {
98        format!("{}__namespaces__", self.config.prefix)
99    }
100
101    /// Build the SCAN/KEYS pattern for a given namespace.
102    fn scan_pattern(&self, namespace: &[&str]) -> String {
103        let ns = namespace.join(":");
104        if ns.is_empty() {
105            format!("{}*", self.config.prefix)
106        } else {
107            format!("{}{ns}:*", self.config.prefix)
108        }
109    }
110
111    async fn get_connection(&self) -> Result<RedisConn, SynapticError> {
112        self.backend.get_connection().await
113    }
114}
115
116/// Helper to GET a key from Redis as an `Option<String>`.
117async fn redis_get_string(con: &mut RedisConn, key: &str) -> Result<Option<String>, SynapticError> {
118    let raw: Option<String> = con
119        .get(key)
120        .await
121        .map_err(|e| SynapticError::Store(format!("Redis GET error: {e}")))?;
122    Ok(raw)
123}
124
125#[async_trait]
126impl synaptic_core::Store for RedisStore {
127    async fn get(&self, namespace: &[&str], key: &str) -> Result<Option<Item>, SynapticError> {
128        let mut con = self.get_connection().await?;
129        let redis_key = self.redis_key(namespace, key);
130
131        let raw = redis_get_string(&mut con, &redis_key).await?;
132
133        match raw {
134            Some(json_str) => {
135                let item: Item = serde_json::from_str(&json_str)
136                    .map_err(|e| SynapticError::Store(format!("JSON deserialize error: {e}")))?;
137                Ok(Some(item))
138            }
139            None => Ok(None),
140        }
141    }
142
143    async fn search(
144        &self,
145        namespace: &[&str],
146        query: Option<&str>,
147        limit: usize,
148    ) -> Result<Vec<Item>, SynapticError> {
149        let mut con = self.get_connection().await?;
150        let pattern = self.scan_pattern(namespace);
151        let ns_index_key = self.namespace_index_key();
152
153        // Collect all matching keys (SCAN for standalone, KEYS for cluster)
154        let all_keys = collect_matching_keys(&mut con, &pattern).await?;
155
156        // Filter out the namespace index key
157        let keys: Vec<String> = all_keys
158            .into_iter()
159            .filter(|k| k != &ns_index_key)
160            .collect();
161
162        // Load items
163        let mut items: Vec<Item> = Vec::new();
164        for k in &keys {
165            let raw = redis_get_string(&mut con, k).await?;
166            if let Some(json_str) = raw {
167                if let Ok(item) = serde_json::from_str::<Item>(&json_str) {
168                    // Apply substring filter if query is provided
169                    if let Some(q) = query {
170                        if item.key.contains(q) || item.value.to_string().contains(q) {
171                            items.push(item);
172                        }
173                    } else {
174                        items.push(item);
175                    }
176                }
177            }
178            if items.len() >= limit {
179                break;
180            }
181        }
182
183        items.truncate(limit);
184        Ok(items)
185    }
186
187    async fn put(&self, namespace: &[&str], key: &str, value: Value) -> Result<(), SynapticError> {
188        let mut con = self.get_connection().await?;
189        let redis_key = self.redis_key(namespace, key);
190        let ns_index_key = self.namespace_index_key();
191        let ns_encoded = encode_namespace(namespace);
192
193        // Check for existing item to preserve created_at
194        let existing = redis_get_string(&mut con, &redis_key).await?;
195
196        let now = now_iso();
197        let created_at = existing
198            .as_ref()
199            .and_then(|json_str| serde_json::from_str::<Item>(json_str).ok())
200            .map(|item| item.created_at)
201            .unwrap_or_else(|| now.clone());
202
203        let item = Item {
204            namespace: namespace.iter().map(|s| s.to_string()).collect(),
205            key: key.to_string(),
206            value,
207            created_at,
208            updated_at: now,
209            score: None,
210        };
211
212        let json_str = serde_json::to_string(&item)
213            .map_err(|e| SynapticError::Store(format!("JSON serialize error: {e}")))?;
214
215        con.set::<_, _, ()>(&redis_key, &json_str)
216            .await
217            .map_err(|e| SynapticError::Store(format!("Redis SET error: {e}")))?;
218
219        // Track namespace in the index
220        con.sadd::<_, _, ()>(&ns_index_key, &ns_encoded)
221            .await
222            .map_err(|e| SynapticError::Store(format!("Redis SADD error: {e}")))?;
223
224        Ok(())
225    }
226
227    async fn delete(&self, namespace: &[&str], key: &str) -> Result<(), SynapticError> {
228        let mut con = self.get_connection().await?;
229        let redis_key = self.redis_key(namespace, key);
230
231        con.del::<_, ()>(&redis_key)
232            .await
233            .map_err(|e| SynapticError::Store(format!("Redis DEL error: {e}")))?;
234
235        Ok(())
236    }
237
238    async fn list_namespaces(&self, prefix: &[&str]) -> Result<Vec<Vec<String>>, SynapticError> {
239        let mut con = self.get_connection().await?;
240        let ns_index_key = self.namespace_index_key();
241
242        let members: Vec<String> = con
243            .smembers(&ns_index_key)
244            .await
245            .map_err(|e| SynapticError::Store(format!("Redis SMEMBERS error: {e}")))?;
246
247        let prefix_str = if prefix.is_empty() {
248            String::new()
249        } else {
250            prefix.join(":")
251        };
252
253        let namespaces: Vec<Vec<String>> = members
254            .into_iter()
255            .filter(|ns| prefix.is_empty() || ns.starts_with(&prefix_str))
256            .map(|ns| ns.split(':').map(String::from).collect())
257            .collect();
258
259        Ok(namespaces)
260    }
261}