redis_oxide/
client.rs

1//! High-level Redis client
2//!
3//! This module provides the main `Client` interface for interacting with Redis.
4
5use crate::cluster::{calculate_slot, ClusterTopology, RedirectHandler};
6use crate::commands::{
7    Command, DecrByCommand, DecrCommand, DelCommand, ExistsCommand, ExpireCommand, GetCommand,
8    IncrByCommand, IncrCommand, SetCommand, TtlCommand,
9};
10use crate::connection::{ConnectionManager, TopologyType};
11use crate::core::{
12    config::ConnectionConfig,
13    error::{RedisError, RedisResult},
14    value::RespValue,
15};
16use crate::pool::Pool;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::RwLock;
21use tracing::{debug, info, warn};
22
23/// High-level Redis client
24///
25/// Automatically handles:
26/// - Topology detection (Standalone vs Cluster)
27/// - MOVED and ASK redirects in cluster mode
28/// - Connection pooling (multiplexed or traditional)
29/// - Reconnection with exponential backoff
30#[derive(Clone)]
31pub struct Client {
32    topology_type: TopologyType,
33    config: ConnectionConfig,
34    /// For standalone: single pool
35    standalone_pool: Option<Arc<Pool>>,
36    /// For cluster: pools per node
37    cluster_pools: Arc<RwLock<HashMap<String, Arc<Pool>>>>,
38    cluster_topology: Option<ClusterTopology>,
39    redirect_handler: Option<RedirectHandler>,
40}
41
42impl Client {
43    /// Connect to Redis with the given configuration
44    ///
45    /// This will automatically detect whether you're connecting to a
46    /// standalone Redis server or a Redis Cluster.
47    ///
48    /// # Example
49    ///
50    /// ```no_run
51    /// use redis_oxide::{Client, ConnectionConfig};
52    ///
53    /// #[tokio::main]
54    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
55    ///     let config = ConnectionConfig::new("redis://localhost:6379");
56    ///     let client = Client::connect(config).await?;
57    ///     Ok(())
58    /// }
59    /// ```
60    pub async fn connect(config: ConnectionConfig) -> RedisResult<Self> {
61        info!("Connecting to Redis...");
62
63        let mut conn_manager = ConnectionManager::new(config.clone());
64        let topology_type = conn_manager.get_topology().await?;
65
66        match topology_type {
67            TopologyType::Standalone => Self::connect_standalone(config, conn_manager).await,
68            TopologyType::Cluster => Self::connect_cluster(config, conn_manager).await,
69        }
70    }
71
72    async fn connect_standalone(
73        config: ConnectionConfig,
74        _conn_manager: ConnectionManager,
75    ) -> RedisResult<Self> {
76        info!("Connecting to Standalone Redis");
77
78        let endpoints = config.parse_endpoints();
79        if endpoints.is_empty() {
80            return Err(RedisError::Config("No endpoints specified".to_string()));
81        }
82
83        let (host, port) = endpoints[0].clone();
84        let pool = Pool::new(config.clone(), host, port).await?;
85
86        Ok(Self {
87            topology_type: TopologyType::Standalone,
88            config,
89            standalone_pool: Some(Arc::new(pool)),
90            cluster_pools: Arc::new(RwLock::new(HashMap::new())),
91            cluster_topology: None,
92            redirect_handler: None,
93        })
94    }
95
96    async fn connect_cluster(
97        config: ConnectionConfig,
98        _conn_manager: ConnectionManager,
99    ) -> RedisResult<Self> {
100        info!("Connecting to Redis Cluster");
101
102        let cluster_topology = ClusterTopology::new();
103        let redirect_handler = RedirectHandler::new(cluster_topology.clone(), config.max_redirects);
104
105        // Initialize cluster topology by connecting to seed nodes
106        let endpoints = config.parse_endpoints();
107        if endpoints.is_empty() {
108            return Err(RedisError::Config("No endpoints specified".to_string()));
109        }
110
111        // Try to get cluster slots from first available node
112        for (host, port) in &endpoints {
113            match Pool::new(config.clone(), host.clone(), *port).await {
114                Ok(pool) => {
115                    // Store the initial pool
116                    let node_key = format!("{}:{}", host, port);
117                    let mut pools = HashMap::new();
118                    pools.insert(node_key, Arc::new(pool));
119
120                    return Ok(Self {
121                        topology_type: TopologyType::Cluster,
122                        config,
123                        standalone_pool: None,
124                        cluster_pools: Arc::new(RwLock::new(pools)),
125                        cluster_topology: Some(cluster_topology),
126                        redirect_handler: Some(redirect_handler),
127                    });
128                }
129                Err(e) => {
130                    warn!(
131                        "Failed to connect to cluster node {}:{}: {:?}",
132                        host, port, e
133                    );
134                    // Try next node
135                }
136            }
137        }
138
139        Err(RedisError::Cluster(
140            "Failed to connect to any cluster node".to_string(),
141        ))
142    }
143
144    /// Execute a command with automatic redirect handling
145    async fn execute_with_redirects<C: Command>(&self, command: C) -> RedisResult<C::Output> {
146        let mut retries = 0;
147        let max_retries = self.config.max_redirects;
148
149        loop {
150            let result = self.execute_command_internal(&command).await;
151
152            match result {
153                Err(ref e) if e.is_redirect() && retries < max_retries => {
154                    retries += 1;
155                    debug!(
156                        "Handling redirect (attempt {}/{}): {:?}",
157                        retries, max_retries, e
158                    );
159
160                    if let Some(ref handler) = self.redirect_handler {
161                        let (host, port, is_ask) = handler.handle_redirect(e).await?;
162
163                        // Ensure we have a pool for the target node
164                        self.ensure_node_pool(&host, port).await?;
165
166                        // For ASK redirects, we need to send ASKING command first
167                        if is_ask {
168                            let node_key = format!("{}:{}", host, port);
169                            if let Some(pool) = self.get_cluster_pool(&node_key).await {
170                                // Send ASKING command
171                                let _ = pool.execute_command("ASKING".to_string(), vec![]).await?;
172                            }
173                        }
174
175                        // Retry the command
176                        continue;
177                    }
178
179                    // No redirect handler available
180                    return Err(RedisError::Cluster(
181                        "Redirect received but no handler available".to_string(),
182                    ));
183                }
184                Err(e) if e.is_redirect() => {
185                    return Err(RedisError::MaxRetriesExceeded(max_retries));
186                }
187                other => {
188                    return other
189                        .map(|resp| command.parse_response(resp))
190                        .and_then(|x| x)
191                }
192            }
193        }
194    }
195
196    async fn execute_command_internal<C: Command>(&self, command: &C) -> RedisResult<RespValue> {
197        match self.topology_type {
198            TopologyType::Standalone => {
199                if let Some(ref pool) = self.standalone_pool {
200                    pool.execute_command(command.command_name().to_string(), command.args())
201                        .await
202                } else {
203                    Err(RedisError::Connection(
204                        "No standalone pool available".to_string(),
205                    ))
206                }
207            }
208            TopologyType::Cluster => {
209                // Get the key and calculate slot
210                let keys = command.keys();
211                if keys.is_empty() {
212                    return Err(RedisError::Cluster("Command has no keys".to_string()));
213                }
214
215                let slot = calculate_slot(keys[0]);
216                debug!("Command key slot: {}", slot);
217
218                // Try to get node from topology
219                let node_key = if let Some(ref topology) = self.cluster_topology {
220                    if let Some((host, port)) = topology.get_node_for_slot(slot).await {
221                        Some(format!("{}:{}", host, port))
222                    } else {
223                        None
224                    }
225                } else {
226                    None
227                };
228
229                // Get pool for the node
230                let pool = if let Some(ref key) = node_key {
231                    self.get_cluster_pool(key).await
232                } else {
233                    // No topology info, use any available pool
234                    self.get_any_cluster_pool().await
235                };
236
237                if let Some(pool) = pool {
238                    pool.execute_command(command.command_name().to_string(), command.args())
239                        .await
240                } else {
241                    Err(RedisError::Cluster(
242                        "No cluster pools available".to_string(),
243                    ))
244                }
245            }
246        }
247    }
248
249    async fn get_cluster_pool(&self, node_key: &str) -> Option<Arc<Pool>> {
250        let pools = self.cluster_pools.read().await;
251        pools.get(node_key).cloned()
252    }
253
254    async fn get_any_cluster_pool(&self) -> Option<Arc<Pool>> {
255        let pools = self.cluster_pools.read().await;
256        pools.values().next().cloned()
257    }
258
259    async fn ensure_node_pool(&self, host: &str, port: u16) -> RedisResult<()> {
260        let node_key = format!("{}:{}", host, port);
261
262        // Check if pool already exists
263        {
264            let pools = self.cluster_pools.read().await;
265            if pools.contains_key(&node_key) {
266                return Ok(());
267            }
268        }
269
270        // Create new pool
271        let pool = Pool::new(self.config.clone(), host.to_string(), port).await?;
272
273        // Insert into pools
274        let mut pools = self.cluster_pools.write().await;
275        pools.insert(node_key, Arc::new(pool));
276
277        Ok(())
278    }
279
280    // High-level command methods
281
282    /// Get the value of a key
283    pub async fn get(&self, key: impl Into<String>) -> RedisResult<Option<String>> {
284        let command = GetCommand::new(key);
285        self.execute_with_redirects(command).await
286    }
287
288    /// Set the value of a key
289    pub async fn set(&self, key: impl Into<String>, value: impl Into<String>) -> RedisResult<bool> {
290        let command = SetCommand::new(key, value);
291        self.execute_with_redirects(command).await
292    }
293
294    /// Set the value of a key with expiration
295    pub async fn set_ex(
296        &self,
297        key: impl Into<String>,
298        value: impl Into<String>,
299        expiration: Duration,
300    ) -> RedisResult<bool> {
301        let command = SetCommand::new(key, value).expire(expiration);
302        self.execute_with_redirects(command).await
303    }
304
305    /// Set the value of a key only if it doesn't exist
306    pub async fn set_nx(
307        &self,
308        key: impl Into<String>,
309        value: impl Into<String>,
310    ) -> RedisResult<bool> {
311        let command = SetCommand::new(key, value).only_if_not_exists();
312        self.execute_with_redirects(command).await
313    }
314
315    /// Delete one or more keys
316    pub async fn del(&self, keys: Vec<String>) -> RedisResult<i64> {
317        let command = DelCommand::new(keys);
318        self.execute_with_redirects(command).await
319    }
320
321    /// Check if one or more keys exist
322    pub async fn exists(&self, keys: Vec<String>) -> RedisResult<i64> {
323        let command = ExistsCommand::new(keys);
324        self.execute_with_redirects(command).await
325    }
326
327    /// Set a key's time to live in seconds
328    pub async fn expire(&self, key: impl Into<String>, duration: Duration) -> RedisResult<bool> {
329        let command = ExpireCommand::new(key, duration);
330        self.execute_with_redirects(command).await
331    }
332
333    /// Get the time to live for a key
334    pub async fn ttl(&self, key: impl Into<String>) -> RedisResult<Option<i64>> {
335        let command = TtlCommand::new(key);
336        self.execute_with_redirects(command).await
337    }
338
339    /// Increment the integer value of a key by one
340    pub async fn incr(&self, key: impl Into<String>) -> RedisResult<i64> {
341        let command = IncrCommand::new(key);
342        self.execute_with_redirects(command).await
343    }
344
345    /// Decrement the integer value of a key by one
346    pub async fn decr(&self, key: impl Into<String>) -> RedisResult<i64> {
347        let command = DecrCommand::new(key);
348        self.execute_with_redirects(command).await
349    }
350
351    /// Increment the integer value of a key by the given amount
352    pub async fn incr_by(&self, key: impl Into<String>, increment: i64) -> RedisResult<i64> {
353        let command = IncrByCommand::new(key, increment);
354        self.execute_with_redirects(command).await
355    }
356
357    /// Decrement the integer value of a key by the given amount
358    pub async fn decr_by(&self, key: impl Into<String>, decrement: i64) -> RedisResult<i64> {
359        let command = DecrByCommand::new(key, decrement);
360        self.execute_with_redirects(command).await
361    }
362
363    /// Get the topology type
364    pub fn topology_type(&self) -> TopologyType {
365        self.topology_type
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372
373    #[test]
374    fn test_client_configuration() {
375        let config = ConnectionConfig::new("redis://localhost:6379");
376        assert!(!config.connection_string.is_empty());
377    }
378}