Skip to main content

rustvello_redis/
connection.rs

1use redis::aio::MultiplexedConnection;
2use redis::Client;
3use tokio::sync::Mutex;
4
5use rustvello_core::error::{RustvelloError, RustvelloResult};
6use rustvello_core::reconnectable::Reconnectable;
7
8/// Shared Redis connection pool.
9///
10/// Wraps a `MultiplexedConnection` which internally multiplexes
11/// multiple concurrent requests over a single TCP connection.
12///
13/// Data is isolated by `app_id`: every Redis key is prefixed with
14/// `rustvello:{app_id}:`, so two pools created with different `app_id`
15/// values against the same Redis server will not see each other's data.
16#[non_exhaustive]
17pub struct RedisPool {
18    client: Client,
19    conn: Mutex<Option<MultiplexedConnection>>,
20    /// Precomputed key root: `"rustvello:{app_id}:"`.
21    prefix: String,
22}
23
24impl RedisPool {
25    /// Create a new pool from a Redis URI (e.g. `redis://127.0.0.1/`).
26    ///
27    /// The `app_id` is used to namespace every key as `rustvello:{app_id}:…`.
28    pub fn new(uri: &str, app_id: &str) -> RustvelloResult<Self> {
29        let client = Client::open(uri).map_err(|e| RustvelloError::Configuration {
30            message: format!("invalid Redis URI: {}", e),
31        })?;
32        Ok(Self {
33            client,
34            conn: Mutex::new(None),
35            prefix: format!("rustvello:{app_id}:"),
36        })
37    }
38
39    /// Key prefix including the trailing colon: `"rustvello:{app_id}:"`.
40    pub fn prefix(&self) -> &str {
41        &self.prefix
42    }
43
44    /// Get or create a multiplexed connection.
45    pub async fn conn(&self) -> RustvelloResult<MultiplexedConnection> {
46        let mut guard = self.conn.lock().await;
47        if let Some(c) = guard.as_ref() {
48            return Ok(c.clone());
49        }
50        let c = self
51            .client
52            .get_multiplexed_async_connection()
53            .await
54            .map_err(|e| RustvelloError::state_backend(format!("Redis connect: {}", e)))?;
55        *guard = Some(c.clone());
56        Ok(c)
57    }
58}
59
60pub(crate) fn redis_err(e: redis::RedisError) -> RustvelloError {
61    RustvelloError::state_backend(format!("Redis: {}", e))
62}
63
64#[async_trait::async_trait]
65impl Reconnectable for RedisPool {
66    async fn health_check(&self) -> bool {
67        match self.conn().await {
68            Ok(mut c) => redis::cmd("PING")
69                .query_async::<String>(&mut c)
70                .await
71                .is_ok(),
72            Err(_) => false,
73        }
74    }
75
76    async fn reconnect(&self) -> RustvelloResult<()> {
77        let mut guard = self.conn.lock().await;
78        // Drop old connection
79        *guard = None;
80        let c = self
81            .client
82            .get_multiplexed_async_connection()
83            .await
84            .map_err(|e| RustvelloError::state_backend(format!("Redis reconnect: {}", e)))?;
85        *guard = Some(c);
86        Ok(())
87    }
88}
89
90/// Scan Redis keys matching a pattern using cursor-based SCAN.
91/// Unlike KEYS, SCAN does not block the server for the entire keyspace.
92pub(crate) async fn scan_keys(
93    conn: &mut MultiplexedConnection,
94    pattern: &str,
95) -> RustvelloResult<Vec<String>> {
96    let mut cursor: u64 = 0;
97    let mut keys = Vec::new();
98    loop {
99        let (next_cursor, batch): (u64, Vec<String>) = redis::cmd("SCAN")
100            .arg(cursor)
101            .arg("MATCH")
102            .arg(pattern)
103            .arg("COUNT")
104            .arg(100)
105            .query_async(conn)
106            .await
107            .map_err(redis_err)?;
108        keys.extend(batch);
109        cursor = next_cursor;
110        if cursor == 0 {
111            break;
112        }
113    }
114    Ok(keys)
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120
121    #[test]
122    fn pool_new_valid_uri() {
123        let pool = RedisPool::new("redis://127.0.0.1/", "test");
124        assert!(pool.is_ok());
125    }
126
127    #[test]
128    fn pool_new_invalid_uri() {
129        let pool = RedisPool::new("not-a-uri", "test");
130        assert!(pool.is_err());
131        let err = match pool {
132            Err(e) => e,
133            Ok(_) => panic!("expected error for invalid URI"),
134        };
135        assert!(
136            matches!(err, RustvelloError::Configuration { .. }),
137            "expected Configuration, got {:?}",
138            err
139        );
140    }
141
142    #[test]
143    fn redis_err_maps_to_storage() {
144        let redis_error = redis::RedisError::from((redis::ErrorKind::IoError, "test IO error"));
145        let mapped = redis_err(redis_error);
146        assert!(
147            matches!(mapped, RustvelloError::Infrastructure { .. }),
148            "expected Infrastructure, got {:?}",
149            mapped
150        );
151    }
152}