1#[cfg(feature = "redis-tcp")]
39use async_trait::async_trait;
40#[cfg(feature = "redis-tcp")]
41use redis::AsyncCommands;
42#[cfg(feature = "redis-tcp")]
43use tracing::instrument;
44
45#[cfg(feature = "redis-tcp")]
46use blazen_memory::error::{MemoryError, Result};
47#[cfg(feature = "redis-tcp")]
48use blazen_memory::store::MemoryBackend;
49#[cfg(feature = "redis-tcp")]
50use blazen_memory::types::StoredEntry;
51
52#[cfg(feature = "upstash")]
53pub mod upstash;
54#[cfg(feature = "upstash")]
55pub use upstash::UpstashBackend;
56
57#[cfg(feature = "redis-tcp")]
59const DEFAULT_PREFIX: &str = "blazen:memory:";
60
61#[cfg(feature = "redis-tcp")]
66pub struct ValkeyBackend {
67 client: redis::Client,
68 prefix: String,
69}
70
71#[cfg(feature = "redis-tcp")]
72impl ValkeyBackend {
73 pub fn new(url: &str) -> std::result::Result<Self, redis::RedisError> {
83 let client = redis::Client::open(url)?;
84 Ok(Self {
85 client,
86 prefix: DEFAULT_PREFIX.to_owned(),
87 })
88 }
89
90 #[must_use]
95 pub fn with_prefix(mut self, prefix: &str) -> Self {
96 prefix.clone_into(&mut self.prefix);
97 self
98 }
99
100 async fn conn(&self) -> Result<redis::aio::MultiplexedConnection> {
102 self.client
103 .get_multiplexed_async_connection()
104 .await
105 .map_err(|e| MemoryError::Backend(format!("Valkey connection error: {e}")))
106 }
107
108 fn entry_key(&self, id: &str) -> String {
111 format!("{}entry:{}", self.prefix, id)
112 }
113
114 fn band_key(&self, band: &str) -> String {
115 format!("{}bands:{}", self.prefix, band)
116 }
117
118 fn ids_key(&self) -> String {
119 format!("{}ids", self.prefix)
120 }
121}
122
123#[cfg(feature = "redis-tcp")]
124#[async_trait]
125impl MemoryBackend for ValkeyBackend {
126 #[instrument(skip(self, entry), fields(id = %entry.id))]
127 async fn put(&self, entry: StoredEntry) -> Result<()> {
128 let mut conn = self.conn().await?;
129
130 let json = serde_json::to_string(&entry)?;
131 let entry_key = self.entry_key(&entry.id);
132 let ids_key = self.ids_key();
133
134 let mut pipe = redis::pipe();
136 pipe.atomic();
137 pipe.set(&entry_key, &json).ignore();
138 pipe.sadd(&ids_key, &entry.id).ignore();
139 for band in &entry.bands {
140 pipe.sadd(self.band_key(band), &entry.id).ignore();
141 }
142
143 pipe.query_async::<()>(&mut conn)
144 .await
145 .map_err(|e| MemoryError::Backend(format!("Valkey put error: {e}")))?;
146
147 Ok(())
148 }
149
150 #[instrument(skip(self))]
151 async fn get(&self, id: &str) -> Result<Option<StoredEntry>> {
152 let mut conn = self.conn().await?;
153 let entry_key = self.entry_key(id);
154
155 let raw: Option<String> = conn
156 .get(&entry_key)
157 .await
158 .map_err(|e| MemoryError::Backend(format!("Valkey get error: {e}")))?;
159
160 match raw {
161 Some(json) => {
162 let entry: StoredEntry = serde_json::from_str(&json)?;
163 Ok(Some(entry))
164 }
165 None => Ok(None),
166 }
167 }
168
169 #[instrument(skip(self))]
170 async fn delete(&self, id: &str) -> Result<bool> {
171 let mut conn = self.conn().await?;
172
173 let entry_key = self.entry_key(id);
175 let raw: Option<String> = conn
176 .get(&entry_key)
177 .await
178 .map_err(|e| MemoryError::Backend(format!("Valkey delete/get error: {e}")))?;
179
180 let entry = match raw {
181 Some(json) => serde_json::from_str::<StoredEntry>(&json)?,
182 None => return Ok(false),
183 };
184
185 let mut pipe = redis::pipe();
187 pipe.atomic();
188 pipe.del(&entry_key).ignore();
189 pipe.srem(self.ids_key(), id).ignore();
190 for band in &entry.bands {
191 pipe.srem(self.band_key(band), id).ignore();
192 }
193
194 pipe.query_async::<()>(&mut conn)
195 .await
196 .map_err(|e| MemoryError::Backend(format!("Valkey delete error: {e}")))?;
197
198 Ok(true)
199 }
200
201 #[instrument(skip(self))]
202 async fn list(&self) -> Result<Vec<StoredEntry>> {
203 let mut conn = self.conn().await?;
204
205 let ids: Vec<String> = conn
206 .smembers(self.ids_key())
207 .await
208 .map_err(|e| MemoryError::Backend(format!("Valkey list/smembers error: {e}")))?;
209
210 if ids.is_empty() {
211 return Ok(Vec::new());
212 }
213
214 let keys: Vec<String> = ids.iter().map(|id| self.entry_key(id)).collect();
216 let values: Vec<Option<String>> = conn
217 .mget(&keys)
218 .await
219 .map_err(|e| MemoryError::Backend(format!("Valkey list/mget error: {e}")))?;
220
221 let mut entries = Vec::with_capacity(values.len());
222 for raw in values.into_iter().flatten() {
223 let entry: StoredEntry = serde_json::from_str(&raw)?;
224 entries.push(entry);
225 }
226
227 Ok(entries)
228 }
229
230 #[instrument(skip(self))]
231 async fn len(&self) -> Result<usize> {
232 let mut conn = self.conn().await?;
233
234 let count: usize = conn
235 .scard(self.ids_key())
236 .await
237 .map_err(|e| MemoryError::Backend(format!("Valkey len error: {e}")))?;
238
239 Ok(count)
240 }
241
242 #[instrument(skip(self, bands))]
243 async fn search_by_bands(&self, bands: &[String], limit: usize) -> Result<Vec<StoredEntry>> {
244 if bands.is_empty() {
245 return Ok(Vec::new());
246 }
247
248 let mut conn = self.conn().await?;
249
250 let band_keys: Vec<String> = bands.iter().map(|b| self.band_key(b)).collect();
251
252 let candidate_ids: Vec<String> = redis::cmd("SUNION")
254 .arg(&band_keys)
255 .query_async(&mut conn)
256 .await
257 .map_err(|e| MemoryError::Backend(format!("Valkey search/sunion error: {e}")))?;
258
259 if candidate_ids.is_empty() {
260 return Ok(Vec::new());
261 }
262
263 let capped: &[String] = if candidate_ids.len() > limit {
265 &candidate_ids[..limit]
266 } else {
267 &candidate_ids
268 };
269
270 let keys: Vec<String> = capped.iter().map(|id| self.entry_key(id)).collect();
271 let values: Vec<Option<String>> = conn
272 .mget(&keys)
273 .await
274 .map_err(|e| MemoryError::Backend(format!("Valkey search/mget error: {e}")))?;
275
276 let mut entries = Vec::with_capacity(values.len());
277 for raw in values.into_iter().flatten() {
278 let entry: StoredEntry = serde_json::from_str(&raw)?;
279 entries.push(entry);
280 }
281
282 Ok(entries)
283 }
284}
285
286#[cfg(all(test, feature = "redis-tcp"))]
291mod tests {
292 use super::*;
293
294 fn valkey_url() -> String {
295 std::env::var("VALKEY_URL").unwrap_or_else(|_| "redis://localhost:6379".into())
296 }
297
298 fn test_prefix() -> String {
300 let id = uuid::Uuid::new_v4();
301 format!("blazen:test:{id}:")
302 }
303
304 fn make_entry(id: &str, text: &str, bands: Vec<String>) -> StoredEntry {
305 StoredEntry {
306 id: id.to_owned(),
307 text: text.to_owned(),
308 elid: None,
309 simhash_hex: None,
310 text_simhash: 0,
311 bands,
312 metadata: serde_json::Value::Null,
313 }
314 }
315
316 fn backend() -> ValkeyBackend {
317 ValkeyBackend::new(&valkey_url())
318 .expect("failed to create ValkeyBackend")
319 .with_prefix(&test_prefix())
320 }
321
322 async fn cleanup(backend: &ValkeyBackend) {
324 if let Ok(entries) = backend.list().await {
326 for entry in &entries {
327 let _ = backend.delete(&entry.id).await;
328 }
329 }
330 }
331
332 #[tokio::test]
333 #[ignore = "requires running Valkey at $VALKEY_URL"]
334 async fn test_valkey_put_get() {
335 let b = backend();
336 let entry = make_entry("e1", "hello world", vec!["b0".into(), "b1".into()]);
337
338 b.put(entry).await.unwrap();
339
340 let got = b.get("e1").await.unwrap();
341 assert!(got.is_some());
342 let got = got.unwrap();
343 assert_eq!(got.id, "e1");
344 assert_eq!(got.text, "hello world");
345 assert_eq!(got.bands, vec!["b0", "b1"]);
346
347 cleanup(&b).await;
348 }
349
350 #[tokio::test]
351 #[ignore = "requires running Valkey at $VALKEY_URL"]
352 async fn test_valkey_get_missing() {
353 let b = backend();
354 let got = b.get("nonexistent").await.unwrap();
355 assert!(got.is_none());
356 }
357
358 #[tokio::test]
359 #[ignore = "requires running Valkey at $VALKEY_URL"]
360 async fn test_valkey_delete() {
361 let b = backend();
362 b.put(make_entry("d1", "delete me", vec!["x".into()]))
363 .await
364 .unwrap();
365
366 assert!(b.delete("d1").await.unwrap());
367 assert!(!b.delete("d1").await.unwrap());
368 assert!(b.get("d1").await.unwrap().is_none());
369
370 cleanup(&b).await;
371 }
372
373 #[tokio::test]
374 #[ignore = "requires running Valkey at $VALKEY_URL"]
375 async fn test_valkey_put_overwrites() {
376 let b = backend();
377 b.put(make_entry("o1", "first", vec!["a".into()]))
378 .await
379 .unwrap();
380 b.put(make_entry("o1", "second", vec!["a".into()]))
381 .await
382 .unwrap();
383
384 assert_eq!(b.len().await.unwrap(), 1);
385 let got = b.get("o1").await.unwrap().unwrap();
386 assert_eq!(got.text, "second");
387
388 cleanup(&b).await;
389 }
390
391 #[tokio::test]
392 #[ignore = "requires running Valkey at $VALKEY_URL"]
393 async fn test_valkey_list() {
394 let b = backend();
395 b.put(make_entry("l1", "alpha", vec![])).await.unwrap();
396 b.put(make_entry("l2", "beta", vec![])).await.unwrap();
397
398 let all = b.list().await.unwrap();
399 assert_eq!(all.len(), 2);
400
401 cleanup(&b).await;
402 }
403
404 #[tokio::test]
405 #[ignore = "requires running Valkey at $VALKEY_URL"]
406 async fn test_valkey_len() {
407 let b = backend();
408 assert_eq!(b.len().await.unwrap(), 0);
409
410 b.put(make_entry("n1", "one", vec![])).await.unwrap();
411 assert_eq!(b.len().await.unwrap(), 1);
412
413 b.put(make_entry("n2", "two", vec![])).await.unwrap();
414 assert_eq!(b.len().await.unwrap(), 2);
415
416 cleanup(&b).await;
417 }
418
419 #[tokio::test]
420 #[ignore = "requires running Valkey at $VALKEY_URL"]
421 async fn test_valkey_search_by_bands() {
422 let b = backend();
423
424 b.put(make_entry("s1", "alpha", vec!["aaa".into(), "bbb".into()]))
425 .await
426 .unwrap();
427 b.put(make_entry("s2", "beta", vec!["ccc".into(), "ddd".into()]))
428 .await
429 .unwrap();
430 b.put(make_entry("s3", "gamma", vec!["eee".into(), "fff".into()]))
431 .await
432 .unwrap();
433
434 let results = b
436 .search_by_bands(&["aaa".into(), "ddd".into()], 10)
437 .await
438 .unwrap();
439 assert_eq!(results.len(), 2);
440
441 let ids: Vec<&str> = results.iter().map(|e| e.id.as_str()).collect();
442 assert!(ids.contains(&"s1"));
443 assert!(ids.contains(&"s2"));
444
445 cleanup(&b).await;
446 }
447
448 #[tokio::test]
449 #[ignore = "requires running Valkey at $VALKEY_URL"]
450 async fn test_valkey_search_by_bands_no_match() {
451 let b = backend();
452 b.put(make_entry("x1", "solo", vec!["aaa".into()]))
453 .await
454 .unwrap();
455
456 let results = b.search_by_bands(&["zzz".into()], 10).await.unwrap();
457 assert!(results.is_empty());
458
459 cleanup(&b).await;
460 }
461
462 #[tokio::test]
463 #[ignore = "requires running Valkey at $VALKEY_URL"]
464 async fn test_valkey_search_by_bands_respects_limit() {
465 let b = backend();
466
467 for i in 0..10 {
468 b.put(make_entry(
469 &format!("lim{i}"),
470 &format!("entry {i}"),
471 vec!["shared".into()],
472 ))
473 .await
474 .unwrap();
475 }
476
477 let results = b.search_by_bands(&["shared".into()], 3).await.unwrap();
478 assert!(results.len() <= 3);
479
480 cleanup(&b).await;
481 }
482
483 #[tokio::test]
484 #[ignore = "requires running Valkey at $VALKEY_URL"]
485 async fn test_valkey_delete_cleans_bands() {
486 let b = backend();
487 b.put(make_entry("bc1", "band cleanup", vec!["band_x".into()]))
488 .await
489 .unwrap();
490
491 b.delete("bc1").await.unwrap();
492
493 let results = b.search_by_bands(&["band_x".into()], 10).await.unwrap();
495 assert!(results.is_empty());
496
497 cleanup(&b).await;
498 }
499
500 #[tokio::test]
501 #[ignore = "requires running Valkey at $VALKEY_URL"]
502 async fn test_valkey_custom_prefix() {
503 let b = ValkeyBackend::new(&valkey_url())
504 .unwrap()
505 .with_prefix(&format!("custom:{}:", uuid::Uuid::new_v4()));
506
507 b.put(make_entry("p1", "prefixed", vec![])).await.unwrap();
508 assert_eq!(b.len().await.unwrap(), 1);
509
510 cleanup(&b).await;
511 }
512}