rustvello_redis/
connection.rs1use redis::aio::MultiplexedConnection;
2use redis::Client;
3use tokio::sync::Mutex;
4
5use rustvello_core::error::{RustvelloError, RustvelloResult};
6use rustvello_core::reconnectable::Reconnectable;
7
8#[non_exhaustive]
17pub struct RedisPool {
18 client: Client,
19 conn: Mutex<Option<MultiplexedConnection>>,
20 prefix: String,
22}
23
24impl RedisPool {
25 pub fn new(uri: &str, app_id: &str) -> RustvelloResult<Self> {
29 let client = Client::open(uri).map_err(|e| RustvelloError::Configuration {
30 message: format!("invalid Redis URI: {}", e),
31 })?;
32 Ok(Self {
33 client,
34 conn: Mutex::new(None),
35 prefix: format!("rustvello:{app_id}:"),
36 })
37 }
38
39 pub fn prefix(&self) -> &str {
41 &self.prefix
42 }
43
44 pub async fn conn(&self) -> RustvelloResult<MultiplexedConnection> {
46 let mut guard = self.conn.lock().await;
47 if let Some(c) = guard.as_ref() {
48 return Ok(c.clone());
49 }
50 let c = self
51 .client
52 .get_multiplexed_async_connection()
53 .await
54 .map_err(|e| RustvelloError::state_backend(format!("Redis connect: {}", e)))?;
55 *guard = Some(c.clone());
56 Ok(c)
57 }
58}
59
60pub(crate) fn redis_err(e: redis::RedisError) -> RustvelloError {
61 RustvelloError::state_backend(format!("Redis: {}", e))
62}
63
64#[async_trait::async_trait]
65impl Reconnectable for RedisPool {
66 async fn health_check(&self) -> bool {
67 match self.conn().await {
68 Ok(mut c) => redis::cmd("PING")
69 .query_async::<String>(&mut c)
70 .await
71 .is_ok(),
72 Err(_) => false,
73 }
74 }
75
76 async fn reconnect(&self) -> RustvelloResult<()> {
77 let mut guard = self.conn.lock().await;
78 *guard = None;
80 let c = self
81 .client
82 .get_multiplexed_async_connection()
83 .await
84 .map_err(|e| RustvelloError::state_backend(format!("Redis reconnect: {}", e)))?;
85 *guard = Some(c);
86 Ok(())
87 }
88}
89
90pub(crate) async fn scan_keys(
93 conn: &mut MultiplexedConnection,
94 pattern: &str,
95) -> RustvelloResult<Vec<String>> {
96 let mut cursor: u64 = 0;
97 let mut keys = Vec::new();
98 loop {
99 let (next_cursor, batch): (u64, Vec<String>) = redis::cmd("SCAN")
100 .arg(cursor)
101 .arg("MATCH")
102 .arg(pattern)
103 .arg("COUNT")
104 .arg(100)
105 .query_async(conn)
106 .await
107 .map_err(redis_err)?;
108 keys.extend(batch);
109 cursor = next_cursor;
110 if cursor == 0 {
111 break;
112 }
113 }
114 Ok(keys)
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120
121 #[test]
122 fn pool_new_valid_uri() {
123 let pool = RedisPool::new("redis://127.0.0.1/", "test");
124 assert!(pool.is_ok());
125 }
126
127 #[test]
128 fn pool_new_invalid_uri() {
129 let pool = RedisPool::new("not-a-uri", "test");
130 assert!(pool.is_err());
131 let err = match pool {
132 Err(e) => e,
133 Ok(_) => panic!("expected error for invalid URI"),
134 };
135 assert!(
136 matches!(err, RustvelloError::Configuration { .. }),
137 "expected Configuration, got {:?}",
138 err
139 );
140 }
141
142 #[test]
143 fn redis_err_maps_to_storage() {
144 let redis_error = redis::RedisError::from((redis::ErrorKind::IoError, "test IO error"));
145 let mapped = redis_err(redis_error);
146 assert!(
147 matches!(mapped, RustvelloError::Infrastructure { .. }),
148 "expected Infrastructure, got {:?}",
149 mapped
150 );
151 }
152}