polars_redis/
connection.rs

1//! Redis connection management.
2//!
3//! This module provides connection abstractions for both single-node Redis
4//! and Redis Cluster deployments.
5//!
6//! # Connection Types
7//!
8//! - [`RedisConnection`]: Single-node Redis connection wrapper
9//! - [`ClusterConnection`] (with `cluster` feature): Redis Cluster connection
10//! - [`RedisConn`]: Unified connection enum that works with both
11//!
12//! # URL Schemes
13//!
14//! - `redis://host:port` - Single-node connection
15//! - `redis+cluster://host:port` - Cluster connection (connects via initial node)
16//! - Multiple URLs - Cluster connection with multiple initial nodes
17//!
18//! # Example
19//!
20//! ```ignore
21//! use polars_redis::connection::{RedisConnection, ConnectionConfig};
22//!
23//! // Single node
24//! let conn = RedisConnection::new("redis://localhost:6379")?;
25//!
26//! // Cluster (with cluster feature)
27//! let conn = RedisConnection::from_config(ConnectionConfig::cluster(&["redis://node1:7000"]))?;
28//! ```
29
30use crate::error::{Error, Result};
31use redis::aio::{ConnectionManager, MultiplexedConnection};
32#[cfg(feature = "cluster")]
33use redis::cluster::ClusterClient;
34#[cfg(feature = "cluster")]
35use redis::cluster_async::ClusterConnection;
36use redis::{Client, Cmd, FromRedisValue, Pipeline};
37
38/// Configuration for Redis connections.
39///
40/// Supports both single-node and cluster configurations.
41#[derive(Debug, Clone)]
42pub enum ConnectionConfig {
43    /// Single Redis node connection.
44    Single {
45        /// Redis URL (e.g., "redis://localhost:6379")
46        url: String,
47    },
48    /// Redis Cluster connection.
49    #[cfg(feature = "cluster")]
50    Cluster {
51        /// Initial cluster node URLs
52        nodes: Vec<String>,
53    },
54}
55
56impl ConnectionConfig {
57    /// Create a single-node configuration.
58    pub fn single(url: impl Into<String>) -> Self {
59        Self::Single { url: url.into() }
60    }
61
62    /// Create a cluster configuration.
63    #[cfg(feature = "cluster")]
64    pub fn cluster(nodes: &[impl AsRef<str>]) -> Self {
65        Self::Cluster {
66            nodes: nodes.iter().map(|s| s.as_ref().to_string()).collect(),
67        }
68    }
69
70    /// Parse a URL and determine the connection type.
71    ///
72    /// URLs starting with `redis+cluster://` are treated as cluster connections.
73    /// Standard `redis://` URLs are single-node connections.
74    pub fn from_url(url: &str) -> Self {
75        #[cfg(feature = "cluster")]
76        if url.starts_with("redis+cluster://") {
77            // Convert redis+cluster:// to redis:// for the client
78            let node_url = url.replace("redis+cluster://", "redis://");
79            return Self::Cluster {
80                nodes: vec![node_url],
81            };
82        }
83
84        Self::Single {
85            url: url.to_string(),
86        }
87    }
88
89    /// Check if this is a cluster configuration.
90    pub fn is_cluster(&self) -> bool {
91        #[cfg(feature = "cluster")]
92        if matches!(self, Self::Cluster { .. }) {
93            return true;
94        }
95        false
96    }
97}
98
99/// Unified async connection that works for both single-node and cluster.
100///
101/// This enum abstracts over the different connection types, providing a unified
102/// interface for executing Redis commands.
103#[derive(Clone)]
104pub enum RedisConn {
105    /// Single-node connection with auto-reconnect.
106    Single(ConnectionManager),
107    /// Cluster connection (cloneable, thread-safe, internally pooled).
108    #[cfg(feature = "cluster")]
109    Cluster(ClusterConnection),
110}
111
112impl RedisConn {
113    /// Execute a Redis command and parse the result.
114    pub async fn query_async<T: FromRedisValue>(&mut self, cmd: &Cmd) -> Result<T> {
115        match self {
116            Self::Single(conn) => cmd.query_async(conn).await.map_err(Error::Connection),
117            #[cfg(feature = "cluster")]
118            Self::Cluster(conn) => cmd.query_async(conn).await.map_err(Error::Connection),
119        }
120    }
121
122    /// Execute a pipeline and parse the results.
123    pub async fn pipe_query_async<T: FromRedisValue>(&mut self, pipe: &Pipeline) -> Result<T> {
124        match self {
125            Self::Single(conn) => pipe.query_async(conn).await.map_err(Error::Connection),
126            #[cfg(feature = "cluster")]
127            Self::Cluster(conn) => pipe.query_async(conn).await.map_err(Error::Connection),
128        }
129    }
130
131    /// Check if this is a cluster connection.
132    pub fn is_cluster(&self) -> bool {
133        #[cfg(feature = "cluster")]
134        if matches!(self, Self::Cluster(_)) {
135            return true;
136        }
137        false
138    }
139
140    /// Get the underlying ConnectionManager (panics if cluster).
141    ///
142    /// Use this only when you specifically need single-node connection features.
143    pub fn as_single(&mut self) -> &mut ConnectionManager {
144        match self {
145            Self::Single(conn) => conn,
146            #[cfg(feature = "cluster")]
147            Self::Cluster(_) => panic!("Cannot get single connection from cluster"),
148        }
149    }
150
151    /// Get the underlying ClusterConnection (panics if single).
152    ///
153    /// Use this only when you specifically need cluster connection features.
154    #[cfg(feature = "cluster")]
155    pub fn as_cluster(&mut self) -> &mut ClusterConnection {
156        match self {
157            Self::Cluster(conn) => conn,
158            Self::Single(_) => panic!("Cannot get cluster connection from single"),
159        }
160    }
161}
162
163/// Redis connection wrapper that manages connection lifecycle.
164///
165/// Supports both single-node and cluster connections based on configuration.
166pub struct RedisConnection {
167    config: ConnectionConfig,
168    client: Option<Client>,
169    #[cfg(feature = "cluster")]
170    cluster_client: Option<ClusterClient>,
171}
172
173impl RedisConnection {
174    /// Create a new Redis connection from a URL.
175    ///
176    /// # Arguments
177    /// * `url` - Redis connection URL (e.g., "redis://localhost:6379")
178    ///
179    /// For cluster connections, use `redis+cluster://` scheme or [`from_config`].
180    ///
181    /// # Examples
182    /// ```ignore
183    /// // Single node
184    /// let conn = RedisConnection::new("redis://localhost:6379")?;
185    ///
186    /// // Cluster (auto-detected from scheme)
187    /// let conn = RedisConnection::new("redis+cluster://node1:7000")?;
188    /// ```
189    pub fn new(url: &str) -> Result<Self> {
190        let config = ConnectionConfig::from_url(url);
191        Self::from_config(config)
192    }
193
194    /// Create a connection from a configuration.
195    ///
196    /// # Examples
197    /// ```ignore
198    /// // Single node
199    /// let config = ConnectionConfig::single("redis://localhost:6379");
200    /// let conn = RedisConnection::from_config(config)?;
201    ///
202    /// // Cluster with multiple initial nodes
203    /// let config = ConnectionConfig::cluster(&[
204    ///     "redis://node1:7000",
205    ///     "redis://node2:7001",
206    /// ]);
207    /// let conn = RedisConnection::from_config(config)?;
208    /// ```
209    pub fn from_config(config: ConnectionConfig) -> Result<Self> {
210        match &config {
211            ConnectionConfig::Single { url } => {
212                let client = Client::open(url.as_str())
213                    .map_err(|e| Error::InvalidUrl(format!("{}: {}", url, e)))?;
214                Ok(Self {
215                    config,
216                    client: Some(client),
217                    #[cfg(feature = "cluster")]
218                    cluster_client: None,
219                })
220            }
221            #[cfg(feature = "cluster")]
222            ConnectionConfig::Cluster { nodes } => {
223                let cluster_client = ClusterClient::new(nodes.clone())
224                    .map_err(|e| Error::InvalidUrl(format!("cluster: {}", e)))?;
225                Ok(Self {
226                    config,
227                    client: None,
228                    cluster_client: Some(cluster_client),
229                })
230            }
231        }
232    }
233
234    /// Create a cluster connection from multiple node URLs.
235    ///
236    /// # Arguments
237    /// * `nodes` - List of initial cluster node URLs
238    ///
239    /// # Examples
240    /// ```ignore
241    /// let conn = RedisConnection::new_cluster(&[
242    ///     "redis://node1:7000",
243    ///     "redis://node2:7001",
244    ///     "redis://node3:7002",
245    /// ])?;
246    /// ```
247    #[cfg(feature = "cluster")]
248    pub fn new_cluster(nodes: &[impl AsRef<str>]) -> Result<Self> {
249        let config = ConnectionConfig::cluster(nodes);
250        Self::from_config(config)
251    }
252
253    /// Check if this is a cluster connection.
254    pub fn is_cluster(&self) -> bool {
255        self.config.is_cluster()
256    }
257
258    /// Get a unified async connection.
259    ///
260    /// Returns a [`RedisConn`] that works with both single-node and cluster.
261    pub async fn get_connection(&self) -> Result<RedisConn> {
262        match &self.config {
263            ConnectionConfig::Single { .. } => {
264                let manager = self.get_connection_manager().await?;
265                Ok(RedisConn::Single(manager))
266            }
267            #[cfg(feature = "cluster")]
268            ConnectionConfig::Cluster { .. } => {
269                let cluster = self.get_cluster_connection().await?;
270                Ok(RedisConn::Cluster(cluster))
271            }
272        }
273    }
274
275    /// Get an async multiplexed connection (single-node only).
276    pub async fn get_async_connection(&self) -> Result<MultiplexedConnection> {
277        let client = self.client.as_ref().ok_or_else(|| {
278            Error::Runtime("Cannot get async connection from cluster config".to_string())
279        })?;
280        client
281            .get_multiplexed_async_connection()
282            .await
283            .map_err(Error::Connection)
284    }
285
286    /// Get a ConnectionManager for async operations with auto-reconnection.
287    ///
288    /// ConnectionManager is cheap to clone and provides automatic reconnection
289    /// on connection failures. Preferred over `get_async_connection()` for
290    /// long-running operations.
291    pub async fn get_connection_manager(&self) -> Result<ConnectionManager> {
292        let client = self.client.as_ref().ok_or_else(|| {
293            Error::Runtime("Cannot get connection manager from cluster config".to_string())
294        })?;
295        ConnectionManager::new(client.clone())
296            .await
297            .map_err(Error::Connection)
298    }
299
300    /// Get a cluster async connection.
301    ///
302    /// ClusterConnection is cheap to clone, thread-safe, and has internal pooling.
303    #[cfg(feature = "cluster")]
304    pub async fn get_cluster_connection(&self) -> Result<ClusterConnection> {
305        let cluster_client = self.cluster_client.as_ref().ok_or_else(|| {
306            Error::Runtime("Cannot get cluster connection from single-node config".to_string())
307        })?;
308        cluster_client
309            .get_async_connection()
310            .await
311            .map_err(Error::Connection)
312    }
313
314    /// Get a sync connection (for simple operations, single-node only).
315    pub fn get_sync_connection(&self) -> Result<redis::Connection> {
316        let client = self.client.as_ref().ok_or_else(|| {
317            Error::Runtime("Cannot get sync connection from cluster config".to_string())
318        })?;
319        client.get_connection().map_err(Error::Connection)
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326
327    #[test]
328    fn test_invalid_url() {
329        let result = RedisConnection::new("not-a-valid-url");
330        assert!(result.is_err());
331    }
332
333    #[test]
334    fn test_valid_url_parsing() {
335        // This should parse successfully even without a running Redis
336        let result = RedisConnection::new("redis://localhost:6379");
337        assert!(result.is_ok());
338        assert!(!result.unwrap().is_cluster());
339    }
340
341    #[test]
342    fn test_connection_config_single() {
343        let config = ConnectionConfig::single("redis://localhost:6379");
344        assert!(!config.is_cluster());
345    }
346
347    #[test]
348    fn test_connection_config_from_url_single() {
349        let config = ConnectionConfig::from_url("redis://localhost:6379");
350        assert!(!config.is_cluster());
351    }
352
353    #[cfg(feature = "cluster")]
354    #[test]
355    fn test_connection_config_cluster() {
356        let config = ConnectionConfig::cluster(&["redis://node1:7000", "redis://node2:7001"]);
357        assert!(config.is_cluster());
358    }
359
360    #[cfg(feature = "cluster")]
361    #[test]
362    fn test_connection_config_from_url_cluster() {
363        let config = ConnectionConfig::from_url("redis+cluster://node1:7000");
364        assert!(config.is_cluster());
365    }
366
367    #[cfg(feature = "cluster")]
368    #[test]
369    fn test_cluster_connection_creation() {
370        // This should parse successfully even without a running cluster
371        let result = RedisConnection::new_cluster(&["redis://localhost:7000"]);
372        assert!(result.is_ok());
373        assert!(result.unwrap().is_cluster());
374    }
375}