Skip to main content

synaptic_redis/
store.rs

1use async_trait::async_trait;
2use redis::AsyncCommands;
3use serde_json::Value;
4use synaptic_core::{Item, SynapticError};
5
6/// Configuration for [`RedisStore`].
7#[derive(Debug, Clone)]
8pub struct RedisStoreConfig {
9    /// Key prefix for all store entries. Defaults to `"synaptic:store:"`.
10    pub prefix: String,
11}
12
13impl Default for RedisStoreConfig {
14    fn default() -> Self {
15        Self {
16            prefix: "synaptic:store:".to_string(),
17        }
18    }
19}
20
21/// Redis-backed implementation of the [`Store`](synaptic_core::Store) trait.
22///
23/// Keys are stored in the format `{prefix}{namespace_joined_by_colon}:{key}`.
24/// A Redis SET at `{prefix}__namespaces__` tracks all known namespace paths
25/// for efficient [`list_namespaces`](synaptic_core::Store::list_namespaces) queries.
26pub struct RedisStore {
27    client: redis::Client,
28    config: RedisStoreConfig,
29}
30
31impl RedisStore {
32    /// Create a new `RedisStore` with an existing Redis client and configuration.
33    pub fn new(client: redis::Client, config: RedisStoreConfig) -> Self {
34        Self { client, config }
35    }
36
37    /// Create a new `RedisStore` from a Redis URL with default configuration.
38    ///
39    /// # Errors
40    ///
41    /// Returns an error if the URL is invalid.
42    pub fn from_url(url: &str) -> Result<Self, SynapticError> {
43        let client = redis::Client::open(url)
44            .map_err(|e| SynapticError::Store(format!("failed to connect to Redis: {e}")))?;
45        Ok(Self {
46            client,
47            config: RedisStoreConfig::default(),
48        })
49    }
50
51    /// Create a new `RedisStore` from a Redis URL with custom configuration.
52    pub fn from_url_with_config(
53        url: &str,
54        config: RedisStoreConfig,
55    ) -> Result<Self, SynapticError> {
56        let client = redis::Client::open(url)
57            .map_err(|e| SynapticError::Store(format!("failed to connect to Redis: {e}")))?;
58        Ok(Self { client, config })
59    }
60
61    /// Build the Redis key for a given namespace and item key.
62    fn redis_key(&self, namespace: &[&str], key: &str) -> String {
63        let ns = namespace.join(":");
64        if ns.is_empty() {
65            format!("{}:{}", self.config.prefix.trim_end_matches(':'), key)
66        } else {
67            format!("{}{ns}:{key}", self.config.prefix)
68        }
69    }
70
71    /// Build the Redis key for the namespace index SET.
72    fn namespace_index_key(&self) -> String {
73        format!("{}__namespaces__", self.config.prefix)
74    }
75
76    /// Build the SCAN pattern for a given namespace.
77    fn scan_pattern(&self, namespace: &[&str]) -> String {
78        let ns = namespace.join(":");
79        if ns.is_empty() {
80            format!("{}*", self.config.prefix)
81        } else {
82            format!("{}{ns}:*", self.config.prefix)
83        }
84    }
85
86    /// Encode namespace as a string for storage in the namespace index.
87    fn encode_namespace(namespace: &[&str]) -> String {
88        namespace.join(":")
89    }
90
91    async fn get_connection(&self) -> Result<redis::aio::MultiplexedConnection, SynapticError> {
92        self.client
93            .get_multiplexed_async_connection()
94            .await
95            .map_err(|e| SynapticError::Store(format!("Redis connection error: {e}")))
96    }
97}
98
99fn now_iso() -> String {
100    format!("{:?}", std::time::SystemTime::now())
101}
102
103/// Helper to GET a key from Redis as an `Option<String>`.
104async fn redis_get_string(
105    con: &mut redis::aio::MultiplexedConnection,
106    key: &str,
107) -> Result<Option<String>, SynapticError> {
108    let raw: Option<String> = con
109        .get(key)
110        .await
111        .map_err(|e| SynapticError::Store(format!("Redis GET error: {e}")))?;
112    Ok(raw)
113}
114
115#[async_trait]
116impl synaptic_core::Store for RedisStore {
117    async fn get(&self, namespace: &[&str], key: &str) -> Result<Option<Item>, SynapticError> {
118        let mut con = self.get_connection().await?;
119        let redis_key = self.redis_key(namespace, key);
120
121        let raw = redis_get_string(&mut con, &redis_key).await?;
122
123        match raw {
124            Some(json_str) => {
125                let item: Item = serde_json::from_str(&json_str)
126                    .map_err(|e| SynapticError::Store(format!("JSON deserialize error: {e}")))?;
127                Ok(Some(item))
128            }
129            None => Ok(None),
130        }
131    }
132
133    async fn search(
134        &self,
135        namespace: &[&str],
136        query: Option<&str>,
137        limit: usize,
138    ) -> Result<Vec<Item>, SynapticError> {
139        let mut con = self.get_connection().await?;
140        let pattern = self.scan_pattern(namespace);
141        let ns_index_key = self.namespace_index_key();
142
143        // Collect all matching keys via SCAN
144        let mut keys: Vec<String> = Vec::new();
145        let mut cursor: u64 = 0;
146        loop {
147            let (next_cursor, batch): (u64, Vec<String>) = redis::cmd("SCAN")
148                .arg(cursor)
149                .arg("MATCH")
150                .arg(&pattern)
151                .arg("COUNT")
152                .arg(100)
153                .query_async(&mut con)
154                .await
155                .map_err(|e| SynapticError::Store(format!("Redis SCAN error: {e}")))?;
156
157            // Filter out the namespace index key
158            for k in batch {
159                if k != ns_index_key {
160                    keys.push(k);
161                }
162            }
163            cursor = next_cursor;
164            if cursor == 0 {
165                break;
166            }
167        }
168
169        // Load items
170        let mut items: Vec<Item> = Vec::new();
171        for k in &keys {
172            let raw = redis_get_string(&mut con, k).await?;
173            if let Some(json_str) = raw {
174                if let Ok(item) = serde_json::from_str::<Item>(&json_str) {
175                    // Apply substring filter if query is provided
176                    if let Some(q) = query {
177                        if item.key.contains(q) || item.value.to_string().contains(q) {
178                            items.push(item);
179                        }
180                    } else {
181                        items.push(item);
182                    }
183                }
184            }
185            if items.len() >= limit {
186                break;
187            }
188        }
189
190        items.truncate(limit);
191        Ok(items)
192    }
193
194    async fn put(&self, namespace: &[&str], key: &str, value: Value) -> Result<(), SynapticError> {
195        let mut con = self.get_connection().await?;
196        let redis_key = self.redis_key(namespace, key);
197        let ns_index_key = self.namespace_index_key();
198        let ns_encoded = Self::encode_namespace(namespace);
199
200        // Check for existing item to preserve created_at
201        let existing = redis_get_string(&mut con, &redis_key).await?;
202
203        let now = now_iso();
204        let created_at = existing
205            .as_ref()
206            .and_then(|json_str| serde_json::from_str::<Item>(json_str).ok())
207            .map(|item| item.created_at)
208            .unwrap_or_else(|| now.clone());
209
210        let item = Item {
211            namespace: namespace.iter().map(|s| s.to_string()).collect(),
212            key: key.to_string(),
213            value,
214            created_at,
215            updated_at: now,
216            score: None,
217        };
218
219        let json_str = serde_json::to_string(&item)
220            .map_err(|e| SynapticError::Store(format!("JSON serialize error: {e}")))?;
221
222        con.set::<_, _, ()>(&redis_key, &json_str)
223            .await
224            .map_err(|e| SynapticError::Store(format!("Redis SET error: {e}")))?;
225
226        // Track namespace in the index
227        con.sadd::<_, _, ()>(&ns_index_key, &ns_encoded)
228            .await
229            .map_err(|e| SynapticError::Store(format!("Redis SADD error: {e}")))?;
230
231        Ok(())
232    }
233
234    async fn delete(&self, namespace: &[&str], key: &str) -> Result<(), SynapticError> {
235        let mut con = self.get_connection().await?;
236        let redis_key = self.redis_key(namespace, key);
237
238        con.del::<_, ()>(&redis_key)
239            .await
240            .map_err(|e| SynapticError::Store(format!("Redis DEL error: {e}")))?;
241
242        Ok(())
243    }
244
245    async fn list_namespaces(&self, prefix: &[&str]) -> Result<Vec<Vec<String>>, SynapticError> {
246        let mut con = self.get_connection().await?;
247        let ns_index_key = self.namespace_index_key();
248
249        let members: Vec<String> = con
250            .smembers(&ns_index_key)
251            .await
252            .map_err(|e| SynapticError::Store(format!("Redis SMEMBERS error: {e}")))?;
253
254        let prefix_str = if prefix.is_empty() {
255            String::new()
256        } else {
257            prefix.join(":")
258        };
259
260        let namespaces: Vec<Vec<String>> = members
261            .into_iter()
262            .filter(|ns| prefix.is_empty() || ns.starts_with(&prefix_str))
263            .map(|ns| ns.split(':').map(String::from).collect())
264            .collect();
265
266        Ok(namespaces)
267    }
268}