Skip to main content

blazen_memory_valkey/
lib.rs

1//! # blazen-memory-valkey
2//!
3//! A Valkey/Redis backend for [`blazen-memory`].
4//!
5//! This crate provides [`ValkeyBackend`], an implementation of
6//! [`MemoryBackend`](blazen_memory::MemoryBackend) that persists entries in
7//! Valkey (or Redis-compatible) using the following key layout:
8//!
9//! | Key pattern                  | Type   | Contents                        |
10//! |------------------------------|--------|---------------------------------|
11//! | `{prefix}entry:{id}`         | STRING | JSON-serialized `StoredEntry`   |
12//! | `{prefix}bands:{band_value}` | SET    | Entry IDs sharing this LSH band |
13//! | `{prefix}ids`                | SET    | All entry IDs                   |
14//!
15//! ## Quick start
16//!
17//! ```rust,no_run
18//! use blazen_memory_valkey::ValkeyBackend;
19//! use blazen_memory::{Memory, MemoryStore, MemoryEntry};
20//!
21//! # async fn example() -> blazen_memory::error::Result<()> {
22//! let backend = ValkeyBackend::new("redis://localhost:6379")
23//!     .expect("failed to create Valkey client");
24//!
25//! let memory = Memory::local(backend);
26//! memory.add(vec![MemoryEntry::new("hello world")]).await?;
27//! # Ok(())
28//! # }
29//! ```
30//!
31//! # Features
32//!
33//! - `redis-tcp` (default): native TCP [`ValkeyBackend`] using the `redis`
34//!   crate. Does not compile on `wasm32-wasip1*` targets.
35//! - `upstash`: wasi-compatible [`UpstashBackend`] that talks to Upstash's
36//!   Redis REST API via [`blazen_llm::http::HttpClient`].
37
38#[cfg(feature = "redis-tcp")]
39use async_trait::async_trait;
40#[cfg(feature = "redis-tcp")]
41use redis::AsyncCommands;
42#[cfg(feature = "redis-tcp")]
43use tracing::instrument;
44
45#[cfg(feature = "redis-tcp")]
46use blazen_memory::error::{MemoryError, Result};
47#[cfg(feature = "redis-tcp")]
48use blazen_memory::store::MemoryBackend;
49#[cfg(feature = "redis-tcp")]
50use blazen_memory::types::StoredEntry;
51
52#[cfg(feature = "upstash")]
53pub mod upstash;
54#[cfg(feature = "upstash")]
55pub use upstash::UpstashBackend;
56
57/// Default key prefix used for namespacing in Valkey/Redis.
58#[cfg(feature = "redis-tcp")]
59const DEFAULT_PREFIX: &str = "blazen:memory:";
60
61/// A [`MemoryBackend`] implementation backed by Valkey (or any Redis-compatible server).
62///
63/// Uses [`redis::aio::MultiplexedConnection`] for async I/O and pipelines
64/// multi-key operations for efficiency.
65#[cfg(feature = "redis-tcp")]
66pub struct ValkeyBackend {
67    client: redis::Client,
68    prefix: String,
69}
70
71#[cfg(feature = "redis-tcp")]
72impl ValkeyBackend {
73    /// Create a new `ValkeyBackend` connected to the given Redis/Valkey URL.
74    ///
75    /// The URL should be in the standard Redis format, e.g. `redis://localhost:6379`
76    /// or `rediss://...` for TLS.
77    ///
78    /// # Errors
79    ///
80    /// Returns a [`redis::RedisError`] if the URL is invalid or the client cannot
81    /// be constructed.
82    pub fn new(url: &str) -> std::result::Result<Self, redis::RedisError> {
83        let client = redis::Client::open(url)?;
84        Ok(Self {
85            client,
86            prefix: DEFAULT_PREFIX.to_owned(),
87        })
88    }
89
90    /// Override the default key prefix (`blazen:memory:`).
91    ///
92    /// This is useful when running multiple logical stores against the same
93    /// Valkey instance.
94    #[must_use]
95    pub fn with_prefix(mut self, prefix: &str) -> Self {
96        prefix.clone_into(&mut self.prefix);
97        self
98    }
99
100    /// Obtain a multiplexed async connection from the underlying client.
101    async fn conn(&self) -> Result<redis::aio::MultiplexedConnection> {
102        self.client
103            .get_multiplexed_async_connection()
104            .await
105            .map_err(|e| MemoryError::Backend(format!("Valkey connection error: {e}")))
106    }
107
108    // -- key helpers ----------------------------------------------------------
109
110    fn entry_key(&self, id: &str) -> String {
111        format!("{}entry:{}", self.prefix, id)
112    }
113
114    fn band_key(&self, band: &str) -> String {
115        format!("{}bands:{}", self.prefix, band)
116    }
117
118    fn ids_key(&self) -> String {
119        format!("{}ids", self.prefix)
120    }
121}
122
123#[cfg(feature = "redis-tcp")]
124#[async_trait]
125impl MemoryBackend for ValkeyBackend {
126    #[instrument(skip(self, entry), fields(id = %entry.id))]
127    async fn put(&self, entry: StoredEntry) -> Result<()> {
128        let mut conn = self.conn().await?;
129
130        let json = serde_json::to_string(&entry)?;
131        let entry_key = self.entry_key(&entry.id);
132        let ids_key = self.ids_key();
133
134        // Pipeline: SET entry, SADD to ids set, SADD to each band set.
135        let mut pipe = redis::pipe();
136        pipe.atomic();
137        pipe.set(&entry_key, &json).ignore();
138        pipe.sadd(&ids_key, &entry.id).ignore();
139        for band in &entry.bands {
140            pipe.sadd(self.band_key(band), &entry.id).ignore();
141        }
142
143        pipe.query_async::<()>(&mut conn)
144            .await
145            .map_err(|e| MemoryError::Backend(format!("Valkey put error: {e}")))?;
146
147        Ok(())
148    }
149
150    #[instrument(skip(self))]
151    async fn get(&self, id: &str) -> Result<Option<StoredEntry>> {
152        let mut conn = self.conn().await?;
153        let entry_key = self.entry_key(id);
154
155        let raw: Option<String> = conn
156            .get(&entry_key)
157            .await
158            .map_err(|e| MemoryError::Backend(format!("Valkey get error: {e}")))?;
159
160        match raw {
161            Some(json) => {
162                let entry: StoredEntry = serde_json::from_str(&json)?;
163                Ok(Some(entry))
164            }
165            None => Ok(None),
166        }
167    }
168
169    #[instrument(skip(self))]
170    async fn delete(&self, id: &str) -> Result<bool> {
171        let mut conn = self.conn().await?;
172
173        // Fetch the entry first so we know which band sets to clean up.
174        let entry_key = self.entry_key(id);
175        let raw: Option<String> = conn
176            .get(&entry_key)
177            .await
178            .map_err(|e| MemoryError::Backend(format!("Valkey delete/get error: {e}")))?;
179
180        let entry = match raw {
181            Some(json) => serde_json::from_str::<StoredEntry>(&json)?,
182            None => return Ok(false),
183        };
184
185        // Pipeline: DEL entry key, SREM from ids set, SREM from each band set.
186        let mut pipe = redis::pipe();
187        pipe.atomic();
188        pipe.del(&entry_key).ignore();
189        pipe.srem(self.ids_key(), id).ignore();
190        for band in &entry.bands {
191            pipe.srem(self.band_key(band), id).ignore();
192        }
193
194        pipe.query_async::<()>(&mut conn)
195            .await
196            .map_err(|e| MemoryError::Backend(format!("Valkey delete error: {e}")))?;
197
198        Ok(true)
199    }
200
201    #[instrument(skip(self))]
202    async fn list(&self) -> Result<Vec<StoredEntry>> {
203        let mut conn = self.conn().await?;
204
205        let ids: Vec<String> = conn
206            .smembers(self.ids_key())
207            .await
208            .map_err(|e| MemoryError::Backend(format!("Valkey list/smembers error: {e}")))?;
209
210        if ids.is_empty() {
211            return Ok(Vec::new());
212        }
213
214        // MGET all entry keys at once.
215        let keys: Vec<String> = ids.iter().map(|id| self.entry_key(id)).collect();
216        let values: Vec<Option<String>> = conn
217            .mget(&keys)
218            .await
219            .map_err(|e| MemoryError::Backend(format!("Valkey list/mget error: {e}")))?;
220
221        let mut entries = Vec::with_capacity(values.len());
222        for raw in values.into_iter().flatten() {
223            let entry: StoredEntry = serde_json::from_str(&raw)?;
224            entries.push(entry);
225        }
226
227        Ok(entries)
228    }
229
230    #[instrument(skip(self))]
231    async fn len(&self) -> Result<usize> {
232        let mut conn = self.conn().await?;
233
234        let count: usize = conn
235            .scard(self.ids_key())
236            .await
237            .map_err(|e| MemoryError::Backend(format!("Valkey len error: {e}")))?;
238
239        Ok(count)
240    }
241
242    #[instrument(skip(self, bands))]
243    async fn search_by_bands(&self, bands: &[String], limit: usize) -> Result<Vec<StoredEntry>> {
244        if bands.is_empty() {
245            return Ok(Vec::new());
246        }
247
248        let mut conn = self.conn().await?;
249
250        let band_keys: Vec<String> = bands.iter().map(|b| self.band_key(b)).collect();
251
252        // SUNION across all matching band sets to get candidate IDs.
253        let candidate_ids: Vec<String> = redis::cmd("SUNION")
254            .arg(&band_keys)
255            .query_async(&mut conn)
256            .await
257            .map_err(|e| MemoryError::Backend(format!("Valkey search/sunion error: {e}")))?;
258
259        if candidate_ids.is_empty() {
260            return Ok(Vec::new());
261        }
262
263        // Cap candidates to limit before fetching full entries.
264        let capped: &[String] = if candidate_ids.len() > limit {
265            &candidate_ids[..limit]
266        } else {
267            &candidate_ids
268        };
269
270        let keys: Vec<String> = capped.iter().map(|id| self.entry_key(id)).collect();
271        let values: Vec<Option<String>> = conn
272            .mget(&keys)
273            .await
274            .map_err(|e| MemoryError::Backend(format!("Valkey search/mget error: {e}")))?;
275
276        let mut entries = Vec::with_capacity(values.len());
277        for raw in values.into_iter().flatten() {
278            let entry: StoredEntry = serde_json::from_str(&raw)?;
279            entries.push(entry);
280        }
281
282        Ok(entries)
283    }
284}
285
286// ---------------------------------------------------------------------------
287// Tests -- gated behind #[ignore] since they require a running Valkey/Redis
288// ---------------------------------------------------------------------------
289
290#[cfg(all(test, feature = "redis-tcp"))]
291mod tests {
292    use super::*;
293
294    fn valkey_url() -> String {
295        std::env::var("VALKEY_URL").unwrap_or_else(|_| "redis://localhost:6379".into())
296    }
297
298    /// Generate a unique prefix per test run to avoid collisions.
299    fn test_prefix() -> String {
300        let id = uuid::Uuid::new_v4();
301        format!("blazen:test:{id}:")
302    }
303
304    fn make_entry(id: &str, text: &str, bands: Vec<String>) -> StoredEntry {
305        StoredEntry {
306            id: id.to_owned(),
307            text: text.to_owned(),
308            elid: None,
309            simhash_hex: None,
310            text_simhash: 0,
311            bands,
312            metadata: serde_json::Value::Null,
313        }
314    }
315
316    fn backend() -> ValkeyBackend {
317        ValkeyBackend::new(&valkey_url())
318            .expect("failed to create ValkeyBackend")
319            .with_prefix(&test_prefix())
320    }
321
322    /// Clean up all keys under the given prefix.
323    async fn cleanup(backend: &ValkeyBackend) {
324        // Best-effort cleanup: delete all known IDs and their associated keys.
325        if let Ok(entries) = backend.list().await {
326            for entry in &entries {
327                let _ = backend.delete(&entry.id).await;
328            }
329        }
330    }
331
332    #[tokio::test]
333    #[ignore = "requires running Valkey at $VALKEY_URL"]
334    async fn test_valkey_put_get() {
335        let b = backend();
336        let entry = make_entry("e1", "hello world", vec!["b0".into(), "b1".into()]);
337
338        b.put(entry).await.unwrap();
339
340        let got = b.get("e1").await.unwrap();
341        assert!(got.is_some());
342        let got = got.unwrap();
343        assert_eq!(got.id, "e1");
344        assert_eq!(got.text, "hello world");
345        assert_eq!(got.bands, vec!["b0", "b1"]);
346
347        cleanup(&b).await;
348    }
349
350    #[tokio::test]
351    #[ignore = "requires running Valkey at $VALKEY_URL"]
352    async fn test_valkey_get_missing() {
353        let b = backend();
354        let got = b.get("nonexistent").await.unwrap();
355        assert!(got.is_none());
356    }
357
358    #[tokio::test]
359    #[ignore = "requires running Valkey at $VALKEY_URL"]
360    async fn test_valkey_delete() {
361        let b = backend();
362        b.put(make_entry("d1", "delete me", vec!["x".into()]))
363            .await
364            .unwrap();
365
366        assert!(b.delete("d1").await.unwrap());
367        assert!(!b.delete("d1").await.unwrap());
368        assert!(b.get("d1").await.unwrap().is_none());
369
370        cleanup(&b).await;
371    }
372
373    #[tokio::test]
374    #[ignore = "requires running Valkey at $VALKEY_URL"]
375    async fn test_valkey_put_overwrites() {
376        let b = backend();
377        b.put(make_entry("o1", "first", vec!["a".into()]))
378            .await
379            .unwrap();
380        b.put(make_entry("o1", "second", vec!["a".into()]))
381            .await
382            .unwrap();
383
384        assert_eq!(b.len().await.unwrap(), 1);
385        let got = b.get("o1").await.unwrap().unwrap();
386        assert_eq!(got.text, "second");
387
388        cleanup(&b).await;
389    }
390
391    #[tokio::test]
392    #[ignore = "requires running Valkey at $VALKEY_URL"]
393    async fn test_valkey_list() {
394        let b = backend();
395        b.put(make_entry("l1", "alpha", vec![])).await.unwrap();
396        b.put(make_entry("l2", "beta", vec![])).await.unwrap();
397
398        let all = b.list().await.unwrap();
399        assert_eq!(all.len(), 2);
400
401        cleanup(&b).await;
402    }
403
404    #[tokio::test]
405    #[ignore = "requires running Valkey at $VALKEY_URL"]
406    async fn test_valkey_len() {
407        let b = backend();
408        assert_eq!(b.len().await.unwrap(), 0);
409
410        b.put(make_entry("n1", "one", vec![])).await.unwrap();
411        assert_eq!(b.len().await.unwrap(), 1);
412
413        b.put(make_entry("n2", "two", vec![])).await.unwrap();
414        assert_eq!(b.len().await.unwrap(), 2);
415
416        cleanup(&b).await;
417    }
418
419    #[tokio::test]
420    #[ignore = "requires running Valkey at $VALKEY_URL"]
421    async fn test_valkey_search_by_bands() {
422        let b = backend();
423
424        b.put(make_entry("s1", "alpha", vec!["aaa".into(), "bbb".into()]))
425            .await
426            .unwrap();
427        b.put(make_entry("s2", "beta", vec!["ccc".into(), "ddd".into()]))
428            .await
429            .unwrap();
430        b.put(make_entry("s3", "gamma", vec!["eee".into(), "fff".into()]))
431            .await
432            .unwrap();
433
434        // Search for bands that match s1 and s2 but not s3.
435        let results = b
436            .search_by_bands(&["aaa".into(), "ddd".into()], 10)
437            .await
438            .unwrap();
439        assert_eq!(results.len(), 2);
440
441        let ids: Vec<&str> = results.iter().map(|e| e.id.as_str()).collect();
442        assert!(ids.contains(&"s1"));
443        assert!(ids.contains(&"s2"));
444
445        cleanup(&b).await;
446    }
447
448    #[tokio::test]
449    #[ignore = "requires running Valkey at $VALKEY_URL"]
450    async fn test_valkey_search_by_bands_no_match() {
451        let b = backend();
452        b.put(make_entry("x1", "solo", vec!["aaa".into()]))
453            .await
454            .unwrap();
455
456        let results = b.search_by_bands(&["zzz".into()], 10).await.unwrap();
457        assert!(results.is_empty());
458
459        cleanup(&b).await;
460    }
461
462    #[tokio::test]
463    #[ignore = "requires running Valkey at $VALKEY_URL"]
464    async fn test_valkey_search_by_bands_respects_limit() {
465        let b = backend();
466
467        for i in 0..10 {
468            b.put(make_entry(
469                &format!("lim{i}"),
470                &format!("entry {i}"),
471                vec!["shared".into()],
472            ))
473            .await
474            .unwrap();
475        }
476
477        let results = b.search_by_bands(&["shared".into()], 3).await.unwrap();
478        assert!(results.len() <= 3);
479
480        cleanup(&b).await;
481    }
482
483    #[tokio::test]
484    #[ignore = "requires running Valkey at $VALKEY_URL"]
485    async fn test_valkey_delete_cleans_bands() {
486        let b = backend();
487        b.put(make_entry("bc1", "band cleanup", vec!["band_x".into()]))
488            .await
489            .unwrap();
490
491        b.delete("bc1").await.unwrap();
492
493        // The band set should no longer return this id.
494        let results = b.search_by_bands(&["band_x".into()], 10).await.unwrap();
495        assert!(results.is_empty());
496
497        cleanup(&b).await;
498    }
499
500    #[tokio::test]
501    #[ignore = "requires running Valkey at $VALKEY_URL"]
502    async fn test_valkey_custom_prefix() {
503        let b = ValkeyBackend::new(&valkey_url())
504            .unwrap()
505            .with_prefix(&format!("custom:{}:", uuid::Uuid::new_v4()));
506
507        b.put(make_entry("p1", "prefixed", vec![])).await.unwrap();
508        assert_eq!(b.len().await.unwrap(), 1);
509
510        cleanup(&b).await;
511    }
512}