polars_redis/
connection.rs1use crate::error::{Error, Result};
31use redis::aio::{ConnectionManager, MultiplexedConnection};
32#[cfg(feature = "cluster")]
33use redis::cluster::ClusterClient;
34#[cfg(feature = "cluster")]
35use redis::cluster_async::ClusterConnection;
36use redis::{Client, Cmd, FromRedisValue, Pipeline};
37
38#[derive(Debug, Clone)]
42pub enum ConnectionConfig {
43 Single {
45 url: String,
47 },
48 #[cfg(feature = "cluster")]
50 Cluster {
51 nodes: Vec<String>,
53 },
54}
55
56impl ConnectionConfig {
57 pub fn single(url: impl Into<String>) -> Self {
59 Self::Single { url: url.into() }
60 }
61
62 #[cfg(feature = "cluster")]
64 pub fn cluster(nodes: &[impl AsRef<str>]) -> Self {
65 Self::Cluster {
66 nodes: nodes.iter().map(|s| s.as_ref().to_string()).collect(),
67 }
68 }
69
70 pub fn from_url(url: &str) -> Self {
75 #[cfg(feature = "cluster")]
76 if url.starts_with("redis+cluster://") {
77 let node_url = url.replace("redis+cluster://", "redis://");
79 return Self::Cluster {
80 nodes: vec![node_url],
81 };
82 }
83
84 Self::Single {
85 url: url.to_string(),
86 }
87 }
88
89 pub fn is_cluster(&self) -> bool {
91 #[cfg(feature = "cluster")]
92 if matches!(self, Self::Cluster { .. }) {
93 return true;
94 }
95 false
96 }
97}
98
99#[derive(Clone)]
104pub enum RedisConn {
105 Single(ConnectionManager),
107 #[cfg(feature = "cluster")]
109 Cluster(ClusterConnection),
110}
111
112impl RedisConn {
113 pub async fn query_async<T: FromRedisValue>(&mut self, cmd: &Cmd) -> Result<T> {
115 match self {
116 Self::Single(conn) => cmd.query_async(conn).await.map_err(Error::Connection),
117 #[cfg(feature = "cluster")]
118 Self::Cluster(conn) => cmd.query_async(conn).await.map_err(Error::Connection),
119 }
120 }
121
122 pub async fn pipe_query_async<T: FromRedisValue>(&mut self, pipe: &Pipeline) -> Result<T> {
124 match self {
125 Self::Single(conn) => pipe.query_async(conn).await.map_err(Error::Connection),
126 #[cfg(feature = "cluster")]
127 Self::Cluster(conn) => pipe.query_async(conn).await.map_err(Error::Connection),
128 }
129 }
130
131 pub fn is_cluster(&self) -> bool {
133 #[cfg(feature = "cluster")]
134 if matches!(self, Self::Cluster(_)) {
135 return true;
136 }
137 false
138 }
139
140 pub fn as_single(&mut self) -> &mut ConnectionManager {
144 match self {
145 Self::Single(conn) => conn,
146 #[cfg(feature = "cluster")]
147 Self::Cluster(_) => panic!("Cannot get single connection from cluster"),
148 }
149 }
150
151 #[cfg(feature = "cluster")]
155 pub fn as_cluster(&mut self) -> &mut ClusterConnection {
156 match self {
157 Self::Cluster(conn) => conn,
158 Self::Single(_) => panic!("Cannot get cluster connection from single"),
159 }
160 }
161}
162
163pub struct RedisConnection {
167 config: ConnectionConfig,
168 client: Option<Client>,
169 #[cfg(feature = "cluster")]
170 cluster_client: Option<ClusterClient>,
171}
172
173impl RedisConnection {
174 pub fn new(url: &str) -> Result<Self> {
190 let config = ConnectionConfig::from_url(url);
191 Self::from_config(config)
192 }
193
194 pub fn from_config(config: ConnectionConfig) -> Result<Self> {
210 match &config {
211 ConnectionConfig::Single { url } => {
212 let client = Client::open(url.as_str())
213 .map_err(|e| Error::InvalidUrl(format!("{}: {}", url, e)))?;
214 Ok(Self {
215 config,
216 client: Some(client),
217 #[cfg(feature = "cluster")]
218 cluster_client: None,
219 })
220 }
221 #[cfg(feature = "cluster")]
222 ConnectionConfig::Cluster { nodes } => {
223 let cluster_client = ClusterClient::new(nodes.clone())
224 .map_err(|e| Error::InvalidUrl(format!("cluster: {}", e)))?;
225 Ok(Self {
226 config,
227 client: None,
228 cluster_client: Some(cluster_client),
229 })
230 }
231 }
232 }
233
234 #[cfg(feature = "cluster")]
248 pub fn new_cluster(nodes: &[impl AsRef<str>]) -> Result<Self> {
249 let config = ConnectionConfig::cluster(nodes);
250 Self::from_config(config)
251 }
252
253 pub fn is_cluster(&self) -> bool {
255 self.config.is_cluster()
256 }
257
258 pub async fn get_connection(&self) -> Result<RedisConn> {
262 match &self.config {
263 ConnectionConfig::Single { .. } => {
264 let manager = self.get_connection_manager().await?;
265 Ok(RedisConn::Single(manager))
266 }
267 #[cfg(feature = "cluster")]
268 ConnectionConfig::Cluster { .. } => {
269 let cluster = self.get_cluster_connection().await?;
270 Ok(RedisConn::Cluster(cluster))
271 }
272 }
273 }
274
275 pub async fn get_async_connection(&self) -> Result<MultiplexedConnection> {
277 let client = self.client.as_ref().ok_or_else(|| {
278 Error::Runtime("Cannot get async connection from cluster config".to_string())
279 })?;
280 client
281 .get_multiplexed_async_connection()
282 .await
283 .map_err(Error::Connection)
284 }
285
286 pub async fn get_connection_manager(&self) -> Result<ConnectionManager> {
292 let client = self.client.as_ref().ok_or_else(|| {
293 Error::Runtime("Cannot get connection manager from cluster config".to_string())
294 })?;
295 ConnectionManager::new(client.clone())
296 .await
297 .map_err(Error::Connection)
298 }
299
300 #[cfg(feature = "cluster")]
304 pub async fn get_cluster_connection(&self) -> Result<ClusterConnection> {
305 let cluster_client = self.cluster_client.as_ref().ok_or_else(|| {
306 Error::Runtime("Cannot get cluster connection from single-node config".to_string())
307 })?;
308 cluster_client
309 .get_async_connection()
310 .await
311 .map_err(Error::Connection)
312 }
313
314 pub fn get_sync_connection(&self) -> Result<redis::Connection> {
316 let client = self.client.as_ref().ok_or_else(|| {
317 Error::Runtime("Cannot get sync connection from cluster config".to_string())
318 })?;
319 client.get_connection().map_err(Error::Connection)
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326
327 #[test]
328 fn test_invalid_url() {
329 let result = RedisConnection::new("not-a-valid-url");
330 assert!(result.is_err());
331 }
332
333 #[test]
334 fn test_valid_url_parsing() {
335 let result = RedisConnection::new("redis://localhost:6379");
337 assert!(result.is_ok());
338 assert!(!result.unwrap().is_cluster());
339 }
340
341 #[test]
342 fn test_connection_config_single() {
343 let config = ConnectionConfig::single("redis://localhost:6379");
344 assert!(!config.is_cluster());
345 }
346
347 #[test]
348 fn test_connection_config_from_url_single() {
349 let config = ConnectionConfig::from_url("redis://localhost:6379");
350 assert!(!config.is_cluster());
351 }
352
353 #[cfg(feature = "cluster")]
354 #[test]
355 fn test_connection_config_cluster() {
356 let config = ConnectionConfig::cluster(&["redis://node1:7000", "redis://node2:7001"]);
357 assert!(config.is_cluster());
358 }
359
360 #[cfg(feature = "cluster")]
361 #[test]
362 fn test_connection_config_from_url_cluster() {
363 let config = ConnectionConfig::from_url("redis+cluster://node1:7000");
364 assert!(config.is_cluster());
365 }
366
367 #[cfg(feature = "cluster")]
368 #[test]
369 fn test_cluster_connection_creation() {
370 let result = RedisConnection::new_cluster(&["redis://localhost:7000"]);
372 assert!(result.is_ok());
373 assert!(result.unwrap().is_cluster());
374 }
375}