redis_oxide/
client.rs

1//! High-level Redis client
2//!
3//! This module provides the main `Client` interface for interacting with Redis.
4
5use crate::cluster::{calculate_slot, ClusterTopology, RedirectHandler};
6use crate::commands::{
7    Command,
8    DecrByCommand,
9    DecrCommand,
10    DelCommand,
11    ExistsCommand,
12    ExpireCommand,
13    GetCommand,
14    HDelCommand,
15    HExistsCommand,
16    HGetAllCommand,
17    HGetCommand,
18    HLenCommand,
19    HMGetCommand,
20    HMSetCommand,
21    HSetCommand,
22    IncrByCommand,
23    IncrCommand,
24    // List commands
25    LIndexCommand,
26    LLenCommand,
27    LPopCommand,
28    LPushCommand,
29    LRangeCommand,
30    LSetCommand,
31    RPopCommand,
32    RPushCommand,
33    // Set commands
34    SAddCommand,
35    SCardCommand,
36    SIsMemberCommand,
37    SMembersCommand,
38    SPopCommand,
39    SRandMemberCommand,
40    SRemCommand,
41    SetCommand,
42    TtlCommand,
43    // Sorted Set commands
44    ZAddCommand,
45    ZCardCommand,
46    ZRangeCommand,
47    ZRankCommand,
48    ZRemCommand,
49    ZRevRankCommand,
50    ZScoreCommand,
51};
52use crate::connection::{ConnectionManager, TopologyType};
53use crate::core::{
54    config::ConnectionConfig,
55    error::{RedisError, RedisResult},
56    value::RespValue,
57};
58use crate::pipeline::{Pipeline, PipelineCommand, PipelineExecutor};
59use crate::pool::Pool;
60use crate::pubsub::{PubSubConnection, Publisher, Subscriber};
61use crate::transaction::{Transaction, TransactionCommand, TransactionExecutor};
62use std::collections::HashMap;
63use std::sync::Arc;
64use std::time::Duration;
65use tokio::sync::RwLock;
66use tracing::{debug, info, warn};
67
68/// High-level Redis client
69///
70/// Automatically handles:
71/// - Topology detection (Standalone vs Cluster)
72/// - MOVED and ASK redirects in cluster mode
73/// - Connection pooling (multiplexed or traditional)
74/// - Reconnection with exponential backoff
75#[derive(Clone)]
76pub struct Client {
77    topology_type: TopologyType,
78    config: ConnectionConfig,
79    /// For standalone: single pool
80    standalone_pool: Option<Arc<Pool>>,
81    /// For cluster: pools per node
82    cluster_pools: Arc<RwLock<HashMap<String, Arc<Pool>>>>,
83    cluster_topology: Option<ClusterTopology>,
84    redirect_handler: Option<RedirectHandler>,
85}
86
87impl Client {
88    /// Connect to Redis with the given configuration
89    ///
90    /// This will automatically detect whether you're connecting to a
91    /// standalone Redis server or a Redis Cluster.
92    ///
93    /// # Example
94    ///
95    /// ```no_run
96    /// use redis_oxide::{Client, ConnectionConfig};
97    ///
98    /// #[tokio::main]
99    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
100    ///     let config = ConnectionConfig::new("redis://localhost:6379");
101    ///     let client = Client::connect(config).await?;
102    ///     Ok(())
103    /// }
104    /// ```
105    pub async fn connect(config: ConnectionConfig) -> RedisResult<Self> {
106        info!("Connecting to Redis...");
107
108        let mut conn_manager = ConnectionManager::new(config.clone());
109        let topology_type = conn_manager.get_topology().await?;
110
111        match topology_type {
112            TopologyType::Standalone => Self::connect_standalone(config, conn_manager).await,
113            TopologyType::Cluster => Self::connect_cluster(config, conn_manager).await,
114        }
115    }
116
117    async fn connect_standalone(
118        config: ConnectionConfig,
119        _conn_manager: ConnectionManager,
120    ) -> RedisResult<Self> {
121        info!("Connecting to Standalone Redis");
122
123        let endpoints = config.parse_endpoints();
124        if endpoints.is_empty() {
125            return Err(RedisError::Config("No endpoints specified".to_string()));
126        }
127
128        let (host, port) = endpoints[0].clone();
129        let pool = Pool::new(config.clone(), host, port).await?;
130
131        Ok(Self {
132            topology_type: TopologyType::Standalone,
133            config,
134            standalone_pool: Some(Arc::new(pool)),
135            cluster_pools: Arc::new(RwLock::new(HashMap::new())),
136            cluster_topology: None,
137            redirect_handler: None,
138        })
139    }
140
141    async fn connect_cluster(
142        config: ConnectionConfig,
143        _conn_manager: ConnectionManager,
144    ) -> RedisResult<Self> {
145        info!("Connecting to Redis Cluster");
146
147        let cluster_topology = ClusterTopology::new();
148        let redirect_handler = RedirectHandler::new(cluster_topology.clone(), config.max_redirects);
149
150        // Initialize cluster topology by connecting to seed nodes
151        let endpoints = config.parse_endpoints();
152        if endpoints.is_empty() {
153            return Err(RedisError::Config("No endpoints specified".to_string()));
154        }
155
156        // Try to get cluster slots from first available node
157        for (host, port) in &endpoints {
158            match Pool::new(config.clone(), host.clone(), *port).await {
159                Ok(pool) => {
160                    // Store the initial pool
161                    let node_key = format!("{}:{}", host, port);
162                    let mut pools = HashMap::new();
163                    pools.insert(node_key, Arc::new(pool));
164
165                    return Ok(Self {
166                        topology_type: TopologyType::Cluster,
167                        config,
168                        standalone_pool: None,
169                        cluster_pools: Arc::new(RwLock::new(pools)),
170                        cluster_topology: Some(cluster_topology),
171                        redirect_handler: Some(redirect_handler),
172                    });
173                }
174                Err(e) => {
175                    warn!(
176                        "Failed to connect to cluster node {}:{}: {:?}",
177                        host, port, e
178                    );
179                    // Try next node
180                }
181            }
182        }
183
184        Err(RedisError::Cluster(
185            "Failed to connect to any cluster node".to_string(),
186        ))
187    }
188
189    /// Execute a command with automatic redirect handling
190    async fn execute_with_redirects<C: Command>(&self, command: C) -> RedisResult<C::Output> {
191        let mut retries = 0;
192        let max_retries = self.config.max_redirects;
193
194        loop {
195            let result = self.execute_command_internal(&command).await;
196
197            match result {
198                Err(ref e) if e.is_redirect() && retries < max_retries => {
199                    retries += 1;
200                    debug!(
201                        "Handling redirect (attempt {}/{}): {:?}",
202                        retries, max_retries, e
203                    );
204
205                    if let Some(ref handler) = self.redirect_handler {
206                        let (host, port, is_ask) = handler.handle_redirect(e).await?;
207
208                        // Ensure we have a pool for the target node
209                        self.ensure_node_pool(&host, port).await?;
210
211                        // For ASK redirects, we need to send ASKING command first
212                        if is_ask {
213                            let node_key = format!("{}:{}", host, port);
214                            if let Some(pool) = self.get_cluster_pool(&node_key).await {
215                                // Send ASKING command
216                                let _ = pool.execute_command("ASKING".to_string(), vec![]).await?;
217                            }
218                        }
219
220                        // Retry the command
221                        continue;
222                    }
223
224                    // No redirect handler available
225                    return Err(RedisError::Cluster(
226                        "Redirect received but no handler available".to_string(),
227                    ));
228                }
229                Err(e) if e.is_redirect() => {
230                    return Err(RedisError::MaxRetriesExceeded(max_retries));
231                }
232                other => {
233                    return other
234                        .map(|resp| command.parse_response(resp))
235                        .and_then(|x| x)
236                }
237            }
238        }
239    }
240
241    async fn execute_command_internal<C: Command>(&self, command: &C) -> RedisResult<RespValue> {
242        match self.topology_type {
243            TopologyType::Standalone => {
244                if let Some(ref pool) = self.standalone_pool {
245                    pool.execute_command(command.command_name().to_string(), command.args())
246                        .await
247                } else {
248                    Err(RedisError::Connection(
249                        "No standalone pool available".to_string(),
250                    ))
251                }
252            }
253            TopologyType::Cluster => {
254                // Get the key and calculate slot
255                let keys = command.keys();
256                if keys.is_empty() {
257                    return Err(RedisError::Cluster("Command has no keys".to_string()));
258                }
259
260                let slot = calculate_slot(keys[0]);
261                debug!("Command key slot: {}", slot);
262
263                // Try to get node from topology
264                let node_key = if let Some(ref topology) = self.cluster_topology {
265                    if let Some((host, port)) = topology.get_node_for_slot(slot).await {
266                        Some(format!("{}:{}", host, port))
267                    } else {
268                        None
269                    }
270                } else {
271                    None
272                };
273
274                // Get pool for the node
275                let pool = if let Some(ref key) = node_key {
276                    self.get_cluster_pool(key).await
277                } else {
278                    // No topology info, use any available pool
279                    self.get_any_cluster_pool().await
280                };
281
282                if let Some(pool) = pool {
283                    pool.execute_command(command.command_name().to_string(), command.args())
284                        .await
285                } else {
286                    Err(RedisError::Cluster(
287                        "No cluster pools available".to_string(),
288                    ))
289                }
290            }
291        }
292    }
293
294    async fn get_cluster_pool(&self, node_key: &str) -> Option<Arc<Pool>> {
295        let pools = self.cluster_pools.read().await;
296        pools.get(node_key).cloned()
297    }
298
299    async fn get_any_cluster_pool(&self) -> Option<Arc<Pool>> {
300        let pools = self.cluster_pools.read().await;
301        pools.values().next().cloned()
302    }
303
304    async fn ensure_node_pool(&self, host: &str, port: u16) -> RedisResult<()> {
305        let node_key = format!("{}:{}", host, port);
306
307        // Check if pool already exists
308        {
309            let pools = self.cluster_pools.read().await;
310            if pools.contains_key(&node_key) {
311                return Ok(());
312            }
313        }
314
315        // Create new pool
316        let pool = Pool::new(self.config.clone(), host.to_string(), port).await?;
317
318        // Insert into pools
319        let mut pools = self.cluster_pools.write().await;
320        pools.insert(node_key, Arc::new(pool));
321
322        Ok(())
323    }
324
325    // High-level command methods
326
327    /// Get the value of a key
328    pub async fn get(&self, key: impl Into<String>) -> RedisResult<Option<String>> {
329        let command = GetCommand::new(key);
330        self.execute_with_redirects(command).await
331    }
332
333    /// Set the value of a key
334    pub async fn set(&self, key: impl Into<String>, value: impl Into<String>) -> RedisResult<bool> {
335        let command = SetCommand::new(key, value);
336        self.execute_with_redirects(command).await
337    }
338
339    /// Set the value of a key with expiration
340    pub async fn set_ex(
341        &self,
342        key: impl Into<String>,
343        value: impl Into<String>,
344        expiration: Duration,
345    ) -> RedisResult<bool> {
346        let command = SetCommand::new(key, value).expire(expiration);
347        self.execute_with_redirects(command).await
348    }
349
350    /// Set the value of a key only if it doesn't exist
351    pub async fn set_nx(
352        &self,
353        key: impl Into<String>,
354        value: impl Into<String>,
355    ) -> RedisResult<bool> {
356        let command = SetCommand::new(key, value).only_if_not_exists();
357        self.execute_with_redirects(command).await
358    }
359
360    /// Delete one or more keys
361    pub async fn del(&self, keys: Vec<String>) -> RedisResult<i64> {
362        let command = DelCommand::new(keys);
363        self.execute_with_redirects(command).await
364    }
365
366    /// Check if one or more keys exist
367    pub async fn exists(&self, keys: Vec<String>) -> RedisResult<i64> {
368        let command = ExistsCommand::new(keys);
369        self.execute_with_redirects(command).await
370    }
371
372    /// Set a key's time to live in seconds
373    pub async fn expire(&self, key: impl Into<String>, duration: Duration) -> RedisResult<bool> {
374        let command = ExpireCommand::new(key, duration);
375        self.execute_with_redirects(command).await
376    }
377
378    /// Get the time to live for a key
379    pub async fn ttl(&self, key: impl Into<String>) -> RedisResult<Option<i64>> {
380        let command = TtlCommand::new(key);
381        self.execute_with_redirects(command).await
382    }
383
384    /// Increment the integer value of a key by one
385    pub async fn incr(&self, key: impl Into<String>) -> RedisResult<i64> {
386        let command = IncrCommand::new(key);
387        self.execute_with_redirects(command).await
388    }
389
390    /// Decrement the integer value of a key by one
391    pub async fn decr(&self, key: impl Into<String>) -> RedisResult<i64> {
392        let command = DecrCommand::new(key);
393        self.execute_with_redirects(command).await
394    }
395
396    /// Increment the integer value of a key by the given amount
397    pub async fn incr_by(&self, key: impl Into<String>, increment: i64) -> RedisResult<i64> {
398        let command = IncrByCommand::new(key, increment);
399        self.execute_with_redirects(command).await
400    }
401
402    /// Decrement the integer value of a key by the given amount
403    pub async fn decr_by(&self, key: impl Into<String>, decrement: i64) -> RedisResult<i64> {
404        let command = DecrByCommand::new(key, decrement);
405        self.execute_with_redirects(command).await
406    }
407
408    // Hash operations
409
410    /// Get the value of a hash field
411    pub async fn hget(
412        &self,
413        key: impl Into<String>,
414        field: impl Into<String>,
415    ) -> RedisResult<Option<String>> {
416        let command = HGetCommand::new(key, field);
417        self.execute_with_redirects(command).await
418    }
419
420    /// Set the value of a hash field
421    pub async fn hset(
422        &self,
423        key: impl Into<String>,
424        field: impl Into<String>,
425        value: impl Into<String>,
426    ) -> RedisResult<i64> {
427        let command = HSetCommand::new(key, field, value);
428        self.execute_with_redirects(command).await
429    }
430
431    /// Delete one or more hash fields
432    pub async fn hdel(&self, key: impl Into<String>, fields: Vec<String>) -> RedisResult<i64> {
433        let command = HDelCommand::new(key, fields);
434        self.execute_with_redirects(command).await
435    }
436
437    /// Get all fields and values in a hash
438    pub async fn hgetall(
439        &self,
440        key: impl Into<String>,
441    ) -> RedisResult<std::collections::HashMap<String, String>> {
442        let command = HGetAllCommand::new(key);
443        self.execute_with_redirects(command).await
444    }
445
446    /// Get the values of multiple hash fields
447    pub async fn hmget(
448        &self,
449        key: impl Into<String>,
450        fields: Vec<String>,
451    ) -> RedisResult<Vec<Option<String>>> {
452        let command = HMGetCommand::new(key, fields);
453        self.execute_with_redirects(command).await
454    }
455
456    /// Set multiple hash fields to multiple values
457    pub async fn hmset(
458        &self,
459        key: impl Into<String>,
460        fields: std::collections::HashMap<String, String>,
461    ) -> RedisResult<String> {
462        let command = HMSetCommand::new(key, fields);
463        self.execute_with_redirects(command).await
464    }
465
466    /// Get the number of fields in a hash
467    pub async fn hlen(&self, key: impl Into<String>) -> RedisResult<i64> {
468        let command = HLenCommand::new(key);
469        self.execute_with_redirects(command).await
470    }
471
472    /// Determine if a hash field exists
473    pub async fn hexists(
474        &self,
475        key: impl Into<String>,
476        field: impl Into<String>,
477    ) -> RedisResult<bool> {
478        let command = HExistsCommand::new(key, field);
479        self.execute_with_redirects(command).await
480    }
481
482    // List operations
483
484    /// Push one or more values to the head of a list
485    pub async fn lpush(&self, key: impl Into<String>, values: Vec<String>) -> RedisResult<i64> {
486        let command = LPushCommand::new(key, values);
487        self.execute_with_redirects(command).await
488    }
489
490    /// Push one or more values to the tail of a list
491    pub async fn rpush(&self, key: impl Into<String>, values: Vec<String>) -> RedisResult<i64> {
492        let command = RPushCommand::new(key, values);
493        self.execute_with_redirects(command).await
494    }
495
496    /// Remove and return the first element of a list
497    pub async fn lpop(&self, key: impl Into<String>) -> RedisResult<Option<String>> {
498        let command = LPopCommand::new(key);
499        self.execute_with_redirects(command).await
500    }
501
502    /// Remove and return the last element of a list
503    pub async fn rpop(&self, key: impl Into<String>) -> RedisResult<Option<String>> {
504        let command = RPopCommand::new(key);
505        self.execute_with_redirects(command).await
506    }
507
508    /// Get a range of elements from a list
509    pub async fn lrange(
510        &self,
511        key: impl Into<String>,
512        start: i64,
513        stop: i64,
514    ) -> RedisResult<Vec<String>> {
515        let command = LRangeCommand::new(key, start, stop);
516        self.execute_with_redirects(command).await
517    }
518
519    /// Get the length of a list
520    pub async fn llen(&self, key: impl Into<String>) -> RedisResult<i64> {
521        let command = LLenCommand::new(key);
522        self.execute_with_redirects(command).await
523    }
524
525    /// Get an element from a list by its index
526    pub async fn lindex(&self, key: impl Into<String>, index: i64) -> RedisResult<Option<String>> {
527        let command = LIndexCommand::new(key, index);
528        self.execute_with_redirects(command).await
529    }
530
531    /// Set the value of an element in a list by its index
532    pub async fn lset(
533        &self,
534        key: impl Into<String>,
535        index: i64,
536        value: impl Into<String>,
537    ) -> RedisResult<()> {
538        let command = LSetCommand::new(key, index, value);
539        let _result: String = self.execute_with_redirects(command).await?;
540        Ok(())
541    }
542
543    // Set operations
544
545    /// Add one or more members to a set
546    pub async fn sadd(&self, key: impl Into<String>, members: Vec<String>) -> RedisResult<i64> {
547        let command = SAddCommand::new(key, members);
548        self.execute_with_redirects(command).await
549    }
550
551    /// Remove one or more members from a set
552    pub async fn srem(&self, key: impl Into<String>, members: Vec<String>) -> RedisResult<i64> {
553        let command = SRemCommand::new(key, members);
554        self.execute_with_redirects(command).await
555    }
556
557    /// Get all members of a set
558    pub async fn smembers(
559        &self,
560        key: impl Into<String>,
561    ) -> RedisResult<std::collections::HashSet<String>> {
562        let command = SMembersCommand::new(key);
563        self.execute_with_redirects(command).await
564    }
565
566    /// Determine if a member is in a set
567    pub async fn sismember(
568        &self,
569        key: impl Into<String>,
570        member: impl Into<String>,
571    ) -> RedisResult<bool> {
572        let command = SIsMemberCommand::new(key, member);
573        self.execute_with_redirects(command).await
574    }
575
576    /// Get the number of members in a set
577    pub async fn scard(&self, key: impl Into<String>) -> RedisResult<i64> {
578        let command = SCardCommand::new(key);
579        self.execute_with_redirects(command).await
580    }
581
582    /// Remove and return a random member from a set
583    pub async fn spop(&self, key: impl Into<String>) -> RedisResult<Option<String>> {
584        let command = SPopCommand::new(key);
585        self.execute_with_redirects(command).await
586    }
587
588    /// Get a random member from a set
589    pub async fn srandmember(&self, key: impl Into<String>) -> RedisResult<Option<String>> {
590        let command = SRandMemberCommand::new(key);
591        self.execute_with_redirects(command).await
592    }
593
594    // Sorted Set operations
595
596    /// Add one or more members to a sorted set
597    pub async fn zadd(
598        &self,
599        key: impl Into<String>,
600        members: std::collections::HashMap<String, f64>,
601    ) -> RedisResult<i64> {
602        let command = ZAddCommand::new(key, members);
603        self.execute_with_redirects(command).await
604    }
605
606    /// Remove one or more members from a sorted set
607    pub async fn zrem(&self, key: impl Into<String>, members: Vec<String>) -> RedisResult<i64> {
608        let command = ZRemCommand::new(key, members);
609        self.execute_with_redirects(command).await
610    }
611
612    /// Get a range of members from a sorted set by index
613    pub async fn zrange(
614        &self,
615        key: impl Into<String>,
616        start: i64,
617        stop: i64,
618    ) -> RedisResult<Vec<String>> {
619        let command = ZRangeCommand::new(key, start, stop);
620        self.execute_with_redirects(command).await
621    }
622
623    /// Get the score of a member in a sorted set
624    pub async fn zscore(
625        &self,
626        key: impl Into<String>,
627        member: impl Into<String>,
628    ) -> RedisResult<Option<f64>> {
629        let command = ZScoreCommand::new(key, member);
630        self.execute_with_redirects(command).await
631    }
632
633    /// Get the number of members in a sorted set
634    pub async fn zcard(&self, key: impl Into<String>) -> RedisResult<i64> {
635        let command = ZCardCommand::new(key);
636        self.execute_with_redirects(command).await
637    }
638
639    /// Get the rank of a member in a sorted set (lowest to highest)
640    pub async fn zrank(
641        &self,
642        key: impl Into<String>,
643        member: impl Into<String>,
644    ) -> RedisResult<Option<i64>> {
645        let command = ZRankCommand::new(key, member);
646        self.execute_with_redirects(command).await
647    }
648
649    /// Get the rank of a member in a sorted set (highest to lowest)
650    pub async fn zrevrank(
651        &self,
652        key: impl Into<String>,
653        member: impl Into<String>,
654    ) -> RedisResult<Option<i64>> {
655        let command = ZRevRankCommand::new(key, member);
656        self.execute_with_redirects(command).await
657    }
658
659    /// Create a new pipeline for batching commands
660    ///
661    /// Pipeline allows you to send multiple commands to Redis in a single
662    /// network round-trip, which can significantly improve performance.
663    ///
664    /// # Examples
665    ///
666    /// ```no_run
667    /// use redis_oxide::{Client, ConnectionConfig};
668    ///
669    /// # #[tokio::main]
670    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
671    /// let config = ConnectionConfig::new("redis://localhost:6379");
672    /// let client = Client::connect(config).await?;
673    ///
674    /// let mut pipeline = client.pipeline();
675    /// pipeline.set("key1", "value1");
676    /// pipeline.set("key2", "value2");
677    /// pipeline.get("key1");
678    ///
679    /// let results = pipeline.execute().await?;
680    /// println!("Pipeline results: {:?}", results);
681    /// # Ok(())
682    /// # }
683    /// ```
684    pub fn pipeline(&self) -> Pipeline {
685        let client_executor = ClientPipelineExecutor {
686            client: self.clone(),
687        };
688        Pipeline::new(Arc::new(tokio::sync::Mutex::new(client_executor)))
689    }
690
691    /// Create a new transaction for atomic command execution
692    ///
693    /// Transactions allow you to execute multiple commands atomically using
694    /// MULTI/EXEC. You can also use WATCH to monitor keys for changes.
695    ///
696    /// # Examples
697    ///
698    /// ```no_run
699    /// use redis_oxide::{Client, ConnectionConfig};
700    ///
701    /// # #[tokio::main]
702    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
703    /// let config = ConnectionConfig::new("redis://localhost:6379");
704    /// let client = Client::connect(config).await?;
705    ///
706    /// let mut transaction = client.transaction().await?;
707    /// transaction.set("key1", "value1");
708    /// transaction.set("key2", "value2");
709    /// transaction.incr("counter");
710    ///
711    /// let results = transaction.exec().await?;
712    /// println!("Transaction results: {:?}", results);
713    /// # Ok(())
714    /// # }
715    /// ```
716    pub async fn transaction(&self) -> RedisResult<Transaction> {
717        let client_executor = ClientTransactionExecutor {
718            client: self.clone(),
719        };
720        Ok(Transaction::new(Arc::new(tokio::sync::Mutex::new(
721            client_executor,
722        ))))
723    }
724
725    /// Publish a message to a Redis channel
726    ///
727    /// Returns the number of subscribers that received the message.
728    ///
729    /// # Examples
730    ///
731    /// ```no_run
732    /// use redis_oxide::{Client, ConnectionConfig};
733    ///
734    /// # #[tokio::main]
735    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
736    /// let config = ConnectionConfig::new("redis://localhost:6379");
737    /// let client = Client::connect(config).await?;
738    ///
739    /// let subscribers = client.publish("news", "Breaking news!").await?;
740    /// println!("Message sent to {} subscribers", subscribers);
741    /// # Ok(())
742    /// # }
743    /// ```
744    pub async fn publish(
745        &self,
746        channel: impl Into<String>,
747        message: impl Into<String>,
748    ) -> RedisResult<i64> {
749        let channel = channel.into();
750        let message = message.into();
751
752        let args = vec![
753            RespValue::from(channel.as_str()),
754            RespValue::from(message.as_str()),
755        ];
756
757        match self.topology_type {
758            TopologyType::Standalone => {
759                if let Some(pool) = &self.standalone_pool {
760                    let result = pool.execute_command("PUBLISH".to_string(), args).await?;
761                    result.as_int()
762                } else {
763                    Err(RedisError::Connection(
764                        "No standalone pool available".to_string(),
765                    ))
766                }
767            }
768            TopologyType::Cluster => {
769                // For cluster, use any available node for PUBLISH
770                let pools = self.cluster_pools.read().await;
771                if let Some((_, pool)) = pools.iter().next() {
772                    let result = pool.execute_command("PUBLISH".to_string(), args).await?;
773                    result.as_int()
774                } else {
775                    Err(RedisError::Cluster(
776                        "No cluster nodes available".to_string(),
777                    ))
778                }
779            }
780        }
781    }
782
783    /// Create a new subscriber for receiving messages from Redis channels
784    ///
785    /// # Examples
786    ///
787    /// ```no_run
788    /// use redis_oxide::{Client, ConnectionConfig};
789    /// use futures::StreamExt;
790    ///
791    /// # #[tokio::main]
792    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
793    /// let config = ConnectionConfig::new("redis://localhost:6379");
794    /// let client = Client::connect(config).await?;
795    ///
796    /// let mut subscriber = client.subscriber().await?;
797    /// subscriber.subscribe(vec!["news".to_string()]).await?;
798    ///
799    /// while let Some(message) = subscriber.next_message().await? {
800    ///     println!("Received: {} on {}", message.payload, message.channel);
801    /// }
802    /// # Ok(())
803    /// # }
804    /// ```
805    pub async fn subscriber(&self) -> RedisResult<Subscriber> {
806        let client_connection = ClientPubSubConnection {
807            client: self.clone(),
808        };
809        Ok(Subscriber::new(Arc::new(tokio::sync::Mutex::new(
810            client_connection,
811        ))))
812    }
813
814    /// Create a new publisher for sending messages to Redis channels
815    ///
816    /// # Examples
817    ///
818    /// ```no_run
819    /// use redis_oxide::{Client, ConnectionConfig};
820    ///
821    /// # #[tokio::main]
822    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
823    /// let config = ConnectionConfig::new("redis://localhost:6379");
824    /// let client = Client::connect(config).await?;
825    ///
826    /// let publisher = client.publisher().await?;
827    /// let subscribers = publisher.publish("news", "Breaking news!").await?;
828    /// println!("Message sent to {} subscribers", subscribers);
829    /// # Ok(())
830    /// # }
831    /// ```
832    pub async fn publisher(&self) -> RedisResult<Publisher> {
833        let client_connection = ClientPubSubConnection {
834            client: self.clone(),
835        };
836        Ok(Publisher::new(Arc::new(tokio::sync::Mutex::new(
837            client_connection,
838        ))))
839    }
840
841    // Lua scripting methods
842
843    /// Execute a Lua script using EVAL
844    ///
845    /// # Arguments
846    ///
847    /// * `script` - The Lua script source code
848    /// * `keys` - List of Redis keys that the script will access (KEYS array in Lua)
849    /// * `args` - List of arguments to pass to the script (ARGV array in Lua)
850    ///
851    /// # Examples
852    ///
853    /// ```no_run
854    /// use redis_oxide::{Client, ConnectionConfig};
855    ///
856    /// # #[tokio::main]
857    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
858    /// let config = ConnectionConfig::new("redis://localhost:6379");
859    /// let client = Client::connect(config).await?;
860    ///
861    /// let script = "return redis.call('GET', KEYS[1])";
862    /// let result: Option<String> = client.eval(
863    ///     script,
864    ///     vec!["mykey".to_string()],
865    ///     vec![]
866    /// ).await?;
867    /// println!("Result: {:?}", result);
868    /// # Ok(())
869    /// # }
870    /// ```
871    pub async fn eval<T>(
872        &self,
873        script: &str,
874        keys: Vec<String>,
875        args: Vec<String>,
876    ) -> RedisResult<T>
877    where
878        T: std::convert::TryFrom<RespValue>,
879        T::Error: Into<RedisError>,
880    {
881        let mut cmd_args = vec![
882            RespValue::from(script),
883            RespValue::from(keys.len().to_string()),
884        ];
885
886        // Add keys
887        for key in keys {
888            cmd_args.push(RespValue::from(key));
889        }
890
891        // Add arguments
892        for arg in args {
893            cmd_args.push(RespValue::from(arg));
894        }
895
896        let result = match self.topology_type {
897            TopologyType::Standalone => {
898                if let Some(pool) = &self.standalone_pool {
899                    pool.execute_command("EVAL".to_string(), cmd_args).await?
900                } else {
901                    return Err(RedisError::Connection(
902                        "No standalone pool available".to_string(),
903                    ));
904                }
905            }
906            TopologyType::Cluster => {
907                // For cluster, use any available node for script execution
908                let pools = self.cluster_pools.read().await;
909                if let Some((_, pool)) = pools.iter().next() {
910                    pool.execute_command("EVAL".to_string(), cmd_args).await?
911                } else {
912                    return Err(RedisError::Cluster(
913                        "No cluster nodes available".to_string(),
914                    ));
915                }
916            }
917        };
918
919        T::try_from(result).map_err(Into::into)
920    }
921
922    /// Execute a Lua script using EVALSHA (script must be cached)
923    ///
924    /// # Arguments
925    ///
926    /// * `sha` - The SHA1 hash of the script
927    /// * `keys` - List of Redis keys that the script will access (KEYS array in Lua)
928    /// * `args` - List of arguments to pass to the script (ARGV array in Lua)
929    ///
930    /// # Examples
931    ///
932    /// ```no_run
933    /// use redis_oxide::{Client, ConnectionConfig};
934    ///
935    /// # #[tokio::main]
936    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
937    /// let config = ConnectionConfig::new("redis://localhost:6379");
938    /// let client = Client::connect(config).await?;
939    ///
940    /// // First load the script
941    /// let script = "return redis.call('GET', KEYS[1])";
942    /// let sha = client.script_load(script).await?;
943    ///
944    /// // Then execute using SHA
945    /// let result: Option<String> = client.evalsha(
946    ///     &sha,
947    ///     vec!["mykey".to_string()],
948    ///     vec![]
949    /// ).await?;
950    /// println!("Result: {:?}", result);
951    /// # Ok(())
952    /// # }
953    /// ```
954    pub async fn evalsha<T>(
955        &self,
956        sha: &str,
957        keys: Vec<String>,
958        args: Vec<String>,
959    ) -> RedisResult<T>
960    where
961        T: std::convert::TryFrom<RespValue>,
962        T::Error: Into<RedisError>,
963    {
964        let mut cmd_args = vec![
965            RespValue::from(sha),
966            RespValue::from(keys.len().to_string()),
967        ];
968
969        // Add keys
970        for key in keys {
971            cmd_args.push(RespValue::from(key));
972        }
973
974        // Add arguments
975        for arg in args {
976            cmd_args.push(RespValue::from(arg));
977        }
978
979        let result = match self.topology_type {
980            TopologyType::Standalone => {
981                if let Some(pool) = &self.standalone_pool {
982                    pool.execute_command("EVALSHA".to_string(), cmd_args)
983                        .await?
984                } else {
985                    return Err(RedisError::Connection(
986                        "No standalone pool available".to_string(),
987                    ));
988                }
989            }
990            TopologyType::Cluster => {
991                // For cluster, use any available node for script execution
992                let pools = self.cluster_pools.read().await;
993                if let Some((_, pool)) = pools.iter().next() {
994                    pool.execute_command("EVALSHA".to_string(), cmd_args)
995                        .await?
996                } else {
997                    return Err(RedisError::Cluster(
998                        "No cluster nodes available".to_string(),
999                    ));
1000                }
1001            }
1002        };
1003
1004        T::try_from(result).map_err(Into::into)
1005    }
1006
1007    /// Load a Lua script into Redis cache
1008    ///
1009    /// Returns the SHA1 hash of the script that can be used with EVALSHA.
1010    ///
1011    /// # Examples
1012    ///
1013    /// ```no_run
1014    /// use redis_oxide::{Client, ConnectionConfig};
1015    ///
1016    /// # #[tokio::main]
1017    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1018    /// let config = ConnectionConfig::new("redis://localhost:6379");
1019    /// let client = Client::connect(config).await?;
1020    ///
1021    /// let script = "return 'Hello, World!'";
1022    /// let sha = client.script_load(script).await?;
1023    /// println!("Script loaded with SHA: {}", sha);
1024    /// # Ok(())
1025    /// # }
1026    /// ```
1027    pub async fn script_load(&self, script: &str) -> RedisResult<String> {
1028        let result = match self.topology_type {
1029            TopologyType::Standalone => {
1030                if let Some(pool) = &self.standalone_pool {
1031                    pool.execute_command(
1032                        "SCRIPT".to_string(),
1033                        vec![RespValue::from("LOAD"), RespValue::from(script)],
1034                    )
1035                    .await?
1036                } else {
1037                    return Err(RedisError::Connection(
1038                        "No standalone pool available".to_string(),
1039                    ));
1040                }
1041            }
1042            TopologyType::Cluster => {
1043                // For cluster, load script on all nodes
1044                let pools = self.cluster_pools.read().await;
1045                let mut sha = String::new();
1046
1047                for (_, pool) in pools.iter() {
1048                    let result = pool
1049                        .execute_command(
1050                            "SCRIPT".to_string(),
1051                            vec![RespValue::from("LOAD"), RespValue::from(script)],
1052                        )
1053                        .await?;
1054                    sha = result.as_string()?;
1055                }
1056
1057                if sha.is_empty() {
1058                    return Err(RedisError::Cluster(
1059                        "No cluster nodes available".to_string(),
1060                    ));
1061                }
1062
1063                return Ok(sha);
1064            }
1065        };
1066
1067        result.as_string()
1068    }
1069
1070    /// Check if scripts exist in Redis cache
1071    ///
1072    /// # Examples
1073    ///
1074    /// ```no_run
1075    /// use redis_oxide::{Client, ConnectionConfig};
1076    ///
1077    /// # #[tokio::main]
1078    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1079    /// let config = ConnectionConfig::new("redis://localhost:6379");
1080    /// let client = Client::connect(config).await?;
1081    ///
1082    /// let script = "return 'Hello'";
1083    /// let sha = client.script_load(script).await?;
1084    ///
1085    /// let exists = client.script_exists(vec![sha]).await?;
1086    /// println!("Script exists: {:?}", exists);
1087    /// # Ok(())
1088    /// # }
1089    /// ```
1090    pub async fn script_exists(&self, shas: Vec<String>) -> RedisResult<Vec<bool>> {
1091        let mut cmd_args = vec![RespValue::from("EXISTS")];
1092        for sha in shas {
1093            cmd_args.push(RespValue::from(sha));
1094        }
1095
1096        let result = match self.topology_type {
1097            TopologyType::Standalone => {
1098                if let Some(pool) = &self.standalone_pool {
1099                    pool.execute_command("SCRIPT".to_string(), cmd_args).await?
1100                } else {
1101                    return Err(RedisError::Connection(
1102                        "No standalone pool available".to_string(),
1103                    ));
1104                }
1105            }
1106            TopologyType::Cluster => {
1107                // For cluster, check on any available node
1108                let pools = self.cluster_pools.read().await;
1109                if let Some((_, pool)) = pools.iter().next() {
1110                    pool.execute_command("SCRIPT".to_string(), cmd_args).await?
1111                } else {
1112                    return Err(RedisError::Cluster(
1113                        "No cluster nodes available".to_string(),
1114                    ));
1115                }
1116            }
1117        };
1118
1119        match result {
1120            RespValue::Array(items) => {
1121                let mut exists = Vec::new();
1122                for item in items {
1123                    match item {
1124                        RespValue::Integer(1) => exists.push(true),
1125                        RespValue::Integer(0) => exists.push(false),
1126                        _ => {
1127                            return Err(RedisError::Type(format!(
1128                                "Unexpected response in SCRIPT EXISTS: {:?}",
1129                                item
1130                            )))
1131                        }
1132                    }
1133                }
1134                Ok(exists)
1135            }
1136            _ => Err(RedisError::Type(format!(
1137                "Unexpected response type for SCRIPT EXISTS: {:?}",
1138                result
1139            ))),
1140        }
1141    }
1142
1143    /// Flush all scripts from Redis cache
1144    ///
1145    /// # Examples
1146    ///
1147    /// ```no_run
1148    /// use redis_oxide::{Client, ConnectionConfig};
1149    ///
1150    /// # #[tokio::main]
1151    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1152    /// let config = ConnectionConfig::new("redis://localhost:6379");
1153    /// let client = Client::connect(config).await?;
1154    ///
1155    /// client.script_flush().await?;
1156    /// println!("All scripts flushed from cache");
1157    /// # Ok(())
1158    /// # }
1159    /// ```
1160    pub async fn script_flush(&self) -> RedisResult<()> {
1161        let cmd_args = vec![RespValue::from("FLUSH")];
1162
1163        match self.topology_type {
1164            TopologyType::Standalone => {
1165                if let Some(pool) = &self.standalone_pool {
1166                    let _result = pool.execute_command("SCRIPT".to_string(), cmd_args).await?;
1167                    Ok(())
1168                } else {
1169                    Err(RedisError::Connection(
1170                        "No standalone pool available".to_string(),
1171                    ))
1172                }
1173            }
1174            TopologyType::Cluster => {
1175                // For cluster, flush scripts on all nodes
1176                let pools = self.cluster_pools.read().await;
1177                for (_, pool) in pools.iter() {
1178                    let _result = pool
1179                        .execute_command("SCRIPT".to_string(), cmd_args.clone())
1180                        .await?;
1181                }
1182                Ok(())
1183            }
1184        }
1185    }
1186
1187    // Redis Streams methods
1188
1189    /// Add an entry to a stream using XADD
1190    ///
1191    /// # Arguments
1192    ///
1193    /// * `stream` - The name of the stream
1194    /// * `id` - The entry ID ("*" for auto-generation)
1195    /// * `fields` - The field-value pairs for the entry
1196    ///
1197    /// # Examples
1198    ///
1199    /// ```no_run
1200    /// use redis_oxide::{Client, ConnectionConfig};
1201    /// use std::collections::HashMap;
1202    ///
1203    /// # #[tokio::main]
1204    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1205    /// let config = ConnectionConfig::new("redis://localhost:6379");
1206    /// let client = Client::connect(config).await?;
1207    ///
1208    /// let mut fields = HashMap::new();
1209    /// fields.insert("user_id".to_string(), "123".to_string());
1210    /// fields.insert("action".to_string(), "login".to_string());
1211    ///
1212    /// let entry_id = client.xadd("events", "*", fields).await?;
1213    /// println!("Added entry: {}", entry_id);
1214    /// # Ok(())
1215    /// # }
1216    /// ```
1217    pub async fn xadd(
1218        &self,
1219        stream: impl Into<String>,
1220        id: impl Into<String>,
1221        fields: std::collections::HashMap<String, String>,
1222    ) -> RedisResult<String> {
1223        let stream = stream.into();
1224        let id = id.into();
1225
1226        let mut cmd_args = vec![RespValue::from(stream.clone()), RespValue::from(id)];
1227
1228        // Add field-value pairs
1229        for (field, value) in fields {
1230            cmd_args.push(RespValue::from(field));
1231            cmd_args.push(RespValue::from(value));
1232        }
1233
1234        let result = match self.topology_type {
1235            TopologyType::Standalone => {
1236                if let Some(pool) = &self.standalone_pool {
1237                    pool.execute_command("XADD".to_string(), cmd_args).await?
1238                } else {
1239                    return Err(RedisError::Connection(
1240                        "No standalone pool available".to_string(),
1241                    ));
1242                }
1243            }
1244            TopologyType::Cluster => {
1245                // For cluster, use the stream name to determine the slot
1246                let slot = calculate_slot(stream.as_bytes());
1247
1248                // Try to get node from topology
1249                let node_key = if let Some(ref topology) = self.cluster_topology {
1250                    if let Some((host, port)) = topology.get_node_for_slot(slot).await {
1251                        Some(format!("{}:{}", host, port))
1252                    } else {
1253                        None
1254                    }
1255                } else {
1256                    None
1257                };
1258
1259                if let Some(node_key) = node_key {
1260                    if let Some(pool) = self.get_cluster_pool(&node_key).await {
1261                        pool.execute_command("XADD".to_string(), cmd_args).await?
1262                    } else {
1263                        return Err(RedisError::Cluster(format!(
1264                            "Pool not found for node: {}",
1265                            node_key
1266                        )));
1267                    }
1268                } else {
1269                    return Err(RedisError::Cluster(format!(
1270                        "No node found for slot: {}",
1271                        slot
1272                    )));
1273                }
1274            }
1275        };
1276
1277        result.as_string()
1278    }
1279
1280    /// Read entries from one or more streams using XREAD
1281    ///
1282    /// # Arguments
1283    ///
1284    /// * `streams` - Vector of (stream_name, last_id) pairs
1285    /// * `count` - Maximum number of entries per stream (None for no limit)
1286    /// * `block` - Block timeout (None for non-blocking)
1287    ///
1288    /// # Examples
1289    ///
1290    /// ```no_run
1291    /// use redis_oxide::{Client, ConnectionConfig};
1292    ///
1293    /// # #[tokio::main]
1294    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1295    /// let config = ConnectionConfig::new("redis://localhost:6379");
1296    /// let client = Client::connect(config).await?;
1297    ///
1298    /// // Non-blocking read
1299    /// let streams = vec![("events".to_string(), "$".to_string())];
1300    /// let messages = client.xread(streams, Some(10), None).await?;
1301    ///
1302    /// for (stream, entries) in messages {
1303    ///     for entry in entries {
1304    ///         println!("Stream {}: {} -> {:?}", stream, entry.id, entry.fields);
1305    ///     }
1306    /// }
1307    /// # Ok(())
1308    /// # }
1309    /// ```
1310    pub async fn xread(
1311        &self,
1312        streams: Vec<(String, String)>,
1313        count: Option<u64>,
1314        block: Option<Duration>,
1315    ) -> RedisResult<std::collections::HashMap<String, Vec<crate::streams::StreamEntry>>> {
1316        let mut cmd_args = vec![];
1317
1318        // Add COUNT option
1319        if let Some(count) = count {
1320            cmd_args.push(RespValue::from("COUNT"));
1321            cmd_args.push(RespValue::from(count.to_string()));
1322        }
1323
1324        // Add BLOCK option
1325        if let Some(block) = block {
1326            cmd_args.push(RespValue::from("BLOCK"));
1327            cmd_args.push(RespValue::from(block.as_millis().to_string()));
1328        }
1329
1330        // Add STREAMS keyword
1331        cmd_args.push(RespValue::from("STREAMS"));
1332
1333        // Add stream names
1334        for (stream, _) in &streams {
1335            cmd_args.push(RespValue::from(stream.clone()));
1336        }
1337
1338        // Add stream IDs
1339        for (_, id) in &streams {
1340            cmd_args.push(RespValue::from(id.clone()));
1341        }
1342
1343        let result = match self.topology_type {
1344            TopologyType::Standalone => {
1345                if let Some(pool) = &self.standalone_pool {
1346                    pool.execute_command("XREAD".to_string(), cmd_args).await?
1347                } else {
1348                    return Err(RedisError::Connection(
1349                        "No standalone pool available".to_string(),
1350                    ));
1351                }
1352            }
1353            TopologyType::Cluster => {
1354                // For cluster, use any available node (XREAD can read from multiple streams)
1355                let pools = self.cluster_pools.read().await;
1356                if let Some((_, pool)) = pools.iter().next() {
1357                    pool.execute_command("XREAD".to_string(), cmd_args).await?
1358                } else {
1359                    return Err(RedisError::Cluster(
1360                        "No cluster nodes available".to_string(),
1361                    ));
1362                }
1363            }
1364        };
1365
1366        crate::streams::parse_xread_response(result)
1367    }
1368
1369    /// Read entries from a stream range using XRANGE
1370    ///
1371    /// # Arguments
1372    ///
1373    /// * `stream` - The name of the stream
1374    /// * `start` - Start ID (inclusive, "-" for beginning)
1375    /// * `end` - End ID (inclusive, "+" for end)
1376    /// * `count` - Maximum number of entries to return
1377    ///
1378    /// # Examples
1379    ///
1380    /// ```no_run
1381    /// use redis_oxide::{Client, ConnectionConfig};
1382    ///
1383    /// # #[tokio::main]
1384    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1385    /// let config = ConnectionConfig::new("redis://localhost:6379");
1386    /// let client = Client::connect(config).await?;
1387    ///
1388    /// // Get all entries
1389    /// let entries = client.xrange("events", "-", "+", None).await?;
1390    /// for entry in entries {
1391    ///     println!("Entry {}: {:?}", entry.id, entry.fields);
1392    /// }
1393    ///
1394    /// // Get last 10 entries
1395    /// let recent = client.xrange("events", "-", "+", Some(10)).await?;
1396    /// # Ok(())
1397    /// # }
1398    /// ```
1399    pub async fn xrange(
1400        &self,
1401        stream: impl Into<String>,
1402        start: impl Into<String>,
1403        end: impl Into<String>,
1404        count: Option<u64>,
1405    ) -> RedisResult<Vec<crate::streams::StreamEntry>> {
1406        let stream = stream.into();
1407        let mut cmd_args = vec![
1408            RespValue::from(stream.clone()),
1409            RespValue::from(start.into()),
1410            RespValue::from(end.into()),
1411        ];
1412
1413        if let Some(count) = count {
1414            cmd_args.push(RespValue::from("COUNT"));
1415            cmd_args.push(RespValue::from(count.to_string()));
1416        }
1417
1418        let result = match self.topology_type {
1419            TopologyType::Standalone => {
1420                if let Some(pool) = &self.standalone_pool {
1421                    pool.execute_command("XRANGE".to_string(), cmd_args).await?
1422                } else {
1423                    return Err(RedisError::Connection(
1424                        "No standalone pool available".to_string(),
1425                    ));
1426                }
1427            }
1428            TopologyType::Cluster => {
1429                // For cluster, use the stream name to determine the slot
1430                let slot = calculate_slot(stream.as_bytes());
1431                // Try to get node from topology
1432                let node_key = if let Some(ref topology) = self.cluster_topology {
1433                    if let Some((host, port)) = topology.get_node_for_slot(slot).await {
1434                        Some(format!("{}:{}", host, port))
1435                    } else {
1436                        None
1437                    }
1438                } else {
1439                    None
1440                };
1441
1442                if let Some(node_key) = node_key {
1443                    if let Some(pool) = self.get_cluster_pool(&node_key).await {
1444                        pool.execute_command("XRANGE".to_string(), cmd_args).await?
1445                    } else {
1446                        return Err(RedisError::Cluster(format!(
1447                            "Pool not found for node: {}",
1448                            node_key
1449                        )));
1450                    }
1451                } else {
1452                    return Err(RedisError::Cluster(format!(
1453                        "No node found for slot: {}",
1454                        slot
1455                    )));
1456                }
1457            }
1458        };
1459
1460        crate::streams::parse_stream_entries(result)
1461    }
1462
1463    /// Get the length of a stream using XLEN
1464    ///
1465    /// # Examples
1466    ///
1467    /// ```no_run
1468    /// use redis_oxide::{Client, ConnectionConfig};
1469    ///
1470    /// # #[tokio::main]
1471    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1472    /// let config = ConnectionConfig::new("redis://localhost:6379");
1473    /// let client = Client::connect(config).await?;
1474    ///
1475    /// let length = client.xlen("events").await?;
1476    /// println!("Stream has {} entries", length);
1477    /// # Ok(())
1478    /// # }
1479    /// ```
1480    pub async fn xlen(&self, stream: impl Into<String>) -> RedisResult<u64> {
1481        let stream = stream.into();
1482        let cmd_args = vec![RespValue::from(stream.clone())];
1483
1484        let result = match self.topology_type {
1485            TopologyType::Standalone => {
1486                if let Some(pool) = &self.standalone_pool {
1487                    pool.execute_command("XLEN".to_string(), cmd_args).await?
1488                } else {
1489                    return Err(RedisError::Connection(
1490                        "No standalone pool available".to_string(),
1491                    ));
1492                }
1493            }
1494            TopologyType::Cluster => {
1495                // For cluster, use the stream name to determine the slot
1496                let slot = calculate_slot(stream.as_bytes());
1497                // Try to get node from topology
1498                let node_key = if let Some(ref topology) = self.cluster_topology {
1499                    if let Some((host, port)) = topology.get_node_for_slot(slot).await {
1500                        Some(format!("{}:{}", host, port))
1501                    } else {
1502                        None
1503                    }
1504                } else {
1505                    None
1506                };
1507
1508                if let Some(node_key) = node_key {
1509                    if let Some(pool) = self.get_cluster_pool(&node_key).await {
1510                        pool.execute_command("XLEN".to_string(), cmd_args).await?
1511                    } else {
1512                        return Err(RedisError::Cluster(format!(
1513                            "Pool not found for node: {}",
1514                            node_key
1515                        )));
1516                    }
1517                } else {
1518                    return Err(RedisError::Cluster(format!(
1519                        "No node found for slot: {}",
1520                        slot
1521                    )));
1522                }
1523            }
1524        };
1525
1526        Ok(result.as_int()? as u64)
1527    }
1528
1529    /// Create a consumer group using XGROUP CREATE
1530    ///
1531    /// # Arguments
1532    ///
1533    /// * `stream` - The name of the stream
1534    /// * `group` - The name of the consumer group
1535    /// * `id` - The starting ID for the group ("$" for latest, "0" for beginning)
1536    /// * `mkstream` - Create the stream if it doesn't exist
1537    ///
1538    /// # Examples
1539    ///
1540    /// ```no_run
1541    /// use redis_oxide::{Client, ConnectionConfig};
1542    ///
1543    /// # #[tokio::main]
1544    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1545    /// let config = ConnectionConfig::new("redis://localhost:6379");
1546    /// let client = Client::connect(config).await?;
1547    ///
1548    /// // Create a consumer group starting from the latest messages
1549    /// client.xgroup_create("events", "processors", "$", true).await?;
1550    /// println!("Consumer group created");
1551    /// # Ok(())
1552    /// # }
1553    /// ```
1554    pub async fn xgroup_create(
1555        &self,
1556        stream: impl Into<String>,
1557        group: impl Into<String>,
1558        id: impl Into<String>,
1559        mkstream: bool,
1560    ) -> RedisResult<()> {
1561        let stream = stream.into();
1562        let mut cmd_args = vec![
1563            RespValue::from("CREATE"),
1564            RespValue::from(stream.clone()),
1565            RespValue::from(group.into()),
1566            RespValue::from(id.into()),
1567        ];
1568
1569        if mkstream {
1570            cmd_args.push(RespValue::from("MKSTREAM"));
1571        }
1572
1573        let result = match self.topology_type {
1574            TopologyType::Standalone => {
1575                if let Some(pool) = &self.standalone_pool {
1576                    pool.execute_command("XGROUP".to_string(), cmd_args).await?
1577                } else {
1578                    return Err(RedisError::Connection(
1579                        "No standalone pool available".to_string(),
1580                    ));
1581                }
1582            }
1583            TopologyType::Cluster => {
1584                // For cluster, use the stream name to determine the slot
1585                let slot = calculate_slot(stream.as_bytes());
1586                // Try to get node from topology
1587                let node_key = if let Some(ref topology) = self.cluster_topology {
1588                    if let Some((host, port)) = topology.get_node_for_slot(slot).await {
1589                        Some(format!("{}:{}", host, port))
1590                    } else {
1591                        None
1592                    }
1593                } else {
1594                    None
1595                };
1596
1597                if let Some(node_key) = node_key {
1598                    if let Some(pool) = self.get_cluster_pool(&node_key).await {
1599                        pool.execute_command("XGROUP".to_string(), cmd_args).await?
1600                    } else {
1601                        return Err(RedisError::Cluster(format!(
1602                            "Pool not found for node: {}",
1603                            node_key
1604                        )));
1605                    }
1606                } else {
1607                    return Err(RedisError::Cluster(format!(
1608                        "No node found for slot: {}",
1609                        slot
1610                    )));
1611                }
1612            }
1613        };
1614
1615        // Expect "OK" response
1616        match result.as_string()?.as_str() {
1617            "OK" => Ok(()),
1618            other => Err(RedisError::Protocol(format!(
1619                "Unexpected XGROUP CREATE response: {}",
1620                other
1621            ))),
1622        }
1623    }
1624
1625    /// Read from a consumer group using XREADGROUP
1626    ///
1627    /// # Arguments
1628    ///
1629    /// * `group` - The consumer group name
1630    /// * `consumer` - The consumer name
1631    /// * `streams` - Vector of (stream_name, id) pairs (">" for new messages)
1632    /// * `count` - Maximum number of entries per stream
1633    /// * `block` - Block timeout
1634    ///
1635    /// # Examples
1636    ///
1637    /// ```no_run
1638    /// use redis_oxide::{Client, ConnectionConfig};
1639    ///
1640    /// # #[tokio::main]
1641    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1642    /// let config = ConnectionConfig::new("redis://localhost:6379");
1643    /// let client = Client::connect(config).await?;
1644    ///
1645    /// // Read new messages from the group
1646    /// let streams = vec![("events".to_string(), ">".to_string())];
1647    /// let messages = client.xreadgroup(
1648    ///     "processors",
1649    ///     "worker-1",
1650    ///     streams,
1651    ///     Some(1),
1652    ///     Some(std::time::Duration::from_secs(1))
1653    /// ).await?;
1654    ///
1655    /// for (stream, entries) in messages {
1656    ///     for entry in entries {
1657    ///         println!("Processing {}: {:?}", entry.id, entry.fields);
1658    ///         // Acknowledge the message after processing
1659    ///         client.xack(&stream, "processors", vec![entry.id]).await?;
1660    ///     }
1661    /// }
1662    /// # Ok(())
1663    /// # }
1664    /// ```
1665    pub async fn xreadgroup(
1666        &self,
1667        group: impl Into<String>,
1668        consumer: impl Into<String>,
1669        streams: Vec<(String, String)>,
1670        count: Option<u64>,
1671        block: Option<Duration>,
1672    ) -> RedisResult<std::collections::HashMap<String, Vec<crate::streams::StreamEntry>>> {
1673        let mut cmd_args = vec![
1674            RespValue::from("GROUP"),
1675            RespValue::from(group.into()),
1676            RespValue::from(consumer.into()),
1677        ];
1678
1679        // Add COUNT option
1680        if let Some(count) = count {
1681            cmd_args.push(RespValue::from("COUNT"));
1682            cmd_args.push(RespValue::from(count.to_string()));
1683        }
1684
1685        // Add BLOCK option
1686        if let Some(block) = block {
1687            cmd_args.push(RespValue::from("BLOCK"));
1688            cmd_args.push(RespValue::from(block.as_millis().to_string()));
1689        }
1690
1691        // Add STREAMS keyword
1692        cmd_args.push(RespValue::from("STREAMS"));
1693
1694        // Add stream names
1695        for (stream, _) in &streams {
1696            cmd_args.push(RespValue::from(stream.clone()));
1697        }
1698
1699        // Add stream IDs
1700        for (_, id) in &streams {
1701            cmd_args.push(RespValue::from(id.clone()));
1702        }
1703
1704        let result = match self.topology_type {
1705            TopologyType::Standalone => {
1706                if let Some(pool) = &self.standalone_pool {
1707                    pool.execute_command("XREADGROUP".to_string(), cmd_args)
1708                        .await?
1709                } else {
1710                    return Err(RedisError::Connection(
1711                        "No standalone pool available".to_string(),
1712                    ));
1713                }
1714            }
1715            TopologyType::Cluster => {
1716                // For cluster, use any available node
1717                let pools = self.cluster_pools.read().await;
1718                if let Some((_, pool)) = pools.iter().next() {
1719                    pool.execute_command("XREADGROUP".to_string(), cmd_args)
1720                        .await?
1721                } else {
1722                    return Err(RedisError::Cluster(
1723                        "No cluster nodes available".to_string(),
1724                    ));
1725                }
1726            }
1727        };
1728
1729        crate::streams::parse_xread_response(result)
1730    }
1731
1732    /// Acknowledge messages using XACK
1733    ///
1734    /// # Arguments
1735    ///
1736    /// * `stream` - The stream name
1737    /// * `group` - The consumer group name
1738    /// * `ids` - Vector of message IDs to acknowledge
1739    ///
1740    /// # Examples
1741    ///
1742    /// ```no_run
1743    /// use redis_oxide::{Client, ConnectionConfig};
1744    ///
1745    /// # #[tokio::main]
1746    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1747    /// let config = ConnectionConfig::new("redis://localhost:6379");
1748    /// let client = Client::connect(config).await?;
1749    ///
1750    /// // Acknowledge processed messages
1751    /// let acked = client.xack("events", "processors", vec![
1752    ///     "1234567890123-0".to_string(),
1753    ///     "1234567890124-0".to_string(),
1754    /// ]).await?;
1755    /// println!("Acknowledged {} messages", acked);
1756    /// # Ok(())
1757    /// # }
1758    /// ```
1759    pub async fn xack(
1760        &self,
1761        stream: impl Into<String>,
1762        group: impl Into<String>,
1763        ids: Vec<String>,
1764    ) -> RedisResult<u64> {
1765        let stream = stream.into();
1766        let mut cmd_args = vec![
1767            RespValue::from(stream.clone()),
1768            RespValue::from(group.into()),
1769        ];
1770
1771        for id in ids {
1772            cmd_args.push(RespValue::from(id));
1773        }
1774
1775        let result = match self.topology_type {
1776            TopologyType::Standalone => {
1777                if let Some(pool) = &self.standalone_pool {
1778                    pool.execute_command("XACK".to_string(), cmd_args).await?
1779                } else {
1780                    return Err(RedisError::Connection(
1781                        "No standalone pool available".to_string(),
1782                    ));
1783                }
1784            }
1785            TopologyType::Cluster => {
1786                // For cluster, use the stream name to determine the slot
1787                let slot = calculate_slot(stream.as_bytes());
1788                // Try to get node from topology
1789                let node_key = if let Some(ref topology) = self.cluster_topology {
1790                    if let Some((host, port)) = topology.get_node_for_slot(slot).await {
1791                        Some(format!("{}:{}", host, port))
1792                    } else {
1793                        None
1794                    }
1795                } else {
1796                    None
1797                };
1798
1799                if let Some(node_key) = node_key {
1800                    if let Some(pool) = self.get_cluster_pool(&node_key).await {
1801                        pool.execute_command("XACK".to_string(), cmd_args).await?
1802                    } else {
1803                        return Err(RedisError::Cluster(format!(
1804                            "Pool not found for node: {}",
1805                            node_key
1806                        )));
1807                    }
1808                } else {
1809                    return Err(RedisError::Cluster(format!(
1810                        "No node found for slot: {}",
1811                        slot
1812                    )));
1813                }
1814            }
1815        };
1816
1817        Ok(result.as_int()? as u64)
1818    }
1819
1820    /// Get the topology type
1821    pub fn topology_type(&self) -> TopologyType {
1822        self.topology_type
1823    }
1824}
1825
1826/// Pipeline executor implementation for Client
1827struct ClientPipelineExecutor {
1828    client: Client,
1829}
1830
1831#[async_trait::async_trait]
1832impl PipelineExecutor for ClientPipelineExecutor {
1833    async fn execute_pipeline(
1834        &mut self,
1835        commands: Vec<Box<dyn PipelineCommand>>,
1836    ) -> RedisResult<Vec<RespValue>> {
1837        if commands.is_empty() {
1838            return Ok(Vec::new());
1839        }
1840
1841        // For pipeline execution, we need to send all commands in one batch
1842        // We'll use the first command to determine the target node (for cluster mode)
1843        let first_command = &commands[0];
1844        let first_key = first_command.key();
1845
1846        match self.client.topology_type {
1847            TopologyType::Standalone => {
1848                // For standalone, execute all commands on the single connection
1849                if let Some(pool) = &self.client.standalone_pool {
1850                    self.execute_pipeline_on_pool(pool, commands).await
1851                } else {
1852                    Err(RedisError::Connection(
1853                        "No standalone pool available".to_string(),
1854                    ))
1855                }
1856            }
1857            TopologyType::Cluster => {
1858                // For cluster mode, we need to group commands by their target slots
1859                // For simplicity in this initial implementation, we'll execute on the node
1860                // determined by the first command's key
1861                if let Some(key) = first_key {
1862                    let slot = calculate_slot(key.as_bytes());
1863                    let node_addr = self.get_node_for_slot(slot).await?;
1864                    let pool = self.get_or_create_pool(&node_addr).await?;
1865                    self.execute_pipeline_on_pool(&pool, commands).await
1866                } else {
1867                    // If no key, use any available node
1868                    let pools = self.client.cluster_pools.read().await;
1869                    if let Some((_, pool)) = pools.iter().next() {
1870                        self.execute_pipeline_on_pool(pool, commands).await
1871                    } else {
1872                        Err(RedisError::Cluster(
1873                            "No cluster nodes available".to_string(),
1874                        ))
1875                    }
1876                }
1877            }
1878        }
1879    }
1880}
1881
1882impl ClientPipelineExecutor {
1883    /// Execute pipeline commands on a specific pool
1884    async fn execute_pipeline_on_pool(
1885        &self,
1886        pool: &Arc<Pool>,
1887        commands: Vec<Box<dyn PipelineCommand>>,
1888    ) -> RedisResult<Vec<RespValue>> {
1889        // Build the pipeline command array
1890        let mut pipeline_args = Vec::new();
1891
1892        for command in commands {
1893            let mut cmd_args = vec![RespValue::from(command.name())];
1894            cmd_args.extend(command.args());
1895            pipeline_args.push(RespValue::Array(cmd_args));
1896        }
1897
1898        // Execute all commands in the pipeline
1899        let mut results = Vec::new();
1900        for cmd_array in pipeline_args {
1901            if let RespValue::Array(args) = cmd_array {
1902                if let Some(RespValue::BulkString(cmd_name)) = args.first() {
1903                    let command = String::from_utf8_lossy(cmd_name).to_string();
1904                    let cmd_args = args.into_iter().skip(1).collect();
1905
1906                    // For now, execute commands sequentially
1907                    // TODO: Implement true pipelining at the protocol level
1908                    let result = pool.execute_command(command, cmd_args).await?;
1909                    results.push(result);
1910                } else if let Some(RespValue::SimpleString(cmd_name)) = args.first() {
1911                    let command = cmd_name.clone();
1912                    let cmd_args = args.into_iter().skip(1).collect();
1913
1914                    let result = pool.execute_command(command, cmd_args).await?;
1915                    results.push(result);
1916                }
1917            }
1918        }
1919
1920        Ok(results)
1921    }
1922
1923    /// Get the node address for a given slot (cluster mode)
1924    async fn get_node_for_slot(&self, slot: u16) -> RedisResult<String> {
1925        if let Some(topology) = &self.client.cluster_topology {
1926            if let Some((host, port)) = topology.get_node_for_slot(slot).await {
1927                Ok(format!("{}:{}", host, port))
1928            } else {
1929                Err(RedisError::Cluster(format!(
1930                    "No node found for slot {}",
1931                    slot
1932                )))
1933            }
1934        } else {
1935            Err(RedisError::Cluster(
1936                "No cluster topology available".to_string(),
1937            ))
1938        }
1939    }
1940
1941    /// Get or create a pool for the given node address
1942    async fn get_or_create_pool(&self, node_addr: &str) -> RedisResult<Arc<Pool>> {
1943        let pools = self.client.cluster_pools.read().await;
1944        if let Some(pool) = pools.get(node_addr) {
1945            Ok(pool.clone())
1946        } else {
1947            drop(pools);
1948
1949            // Create new pool for this node
1950            let mut pools = self.client.cluster_pools.write().await;
1951
1952            // Double-check after acquiring write lock
1953            if let Some(pool) = pools.get(node_addr) {
1954                return Ok(pool.clone());
1955            }
1956
1957            // Parse node address
1958            let parts: Vec<&str> = node_addr.split(':').collect();
1959            if parts.len() != 2 {
1960                return Err(RedisError::Config(format!(
1961                    "Invalid node address: {}",
1962                    node_addr
1963                )));
1964            }
1965
1966            let host = parts[0];
1967            let port: u16 = parts[1].parse().map_err(|_| {
1968                RedisError::Config(format!("Invalid port in address: {}", node_addr))
1969            })?;
1970
1971            // Create config for this node
1972            let node_config = self.client.config.clone();
1973
1974            let pool = Arc::new(Pool::new(node_config, host.to_string(), port).await?);
1975            pools.insert(node_addr.to_string(), pool.clone());
1976
1977            Ok(pool)
1978        }
1979    }
1980}
1981
1982/// Transaction executor implementation for Client
1983struct ClientTransactionExecutor {
1984    client: Client,
1985}
1986
1987#[async_trait::async_trait]
1988impl TransactionExecutor for ClientTransactionExecutor {
1989    async fn multi(&mut self) -> RedisResult<()> {
1990        // Execute MULTI command
1991        match self.client.topology_type {
1992            TopologyType::Standalone => {
1993                if let Some(pool) = &self.client.standalone_pool {
1994                    let _result = pool.execute_command("MULTI".to_string(), vec![]).await?;
1995                    Ok(())
1996                } else {
1997                    Err(RedisError::Connection(
1998                        "No standalone pool available".to_string(),
1999                    ))
2000                }
2001            }
2002            TopologyType::Cluster => {
2003                // For cluster, use any available node for transaction
2004                let pools = self.client.cluster_pools.read().await;
2005                if let Some((_, pool)) = pools.iter().next() {
2006                    let _result = pool.execute_command("MULTI".to_string(), vec![]).await?;
2007                    Ok(())
2008                } else {
2009                    Err(RedisError::Cluster(
2010                        "No cluster nodes available".to_string(),
2011                    ))
2012                }
2013            }
2014        }
2015    }
2016
2017    async fn queue_command(&mut self, command: Box<dyn TransactionCommand>) -> RedisResult<()> {
2018        // Execute the command (it will be queued by Redis after MULTI)
2019        let cmd_name = command.name().to_string();
2020        let cmd_args = command.args();
2021
2022        match self.client.topology_type {
2023            TopologyType::Standalone => {
2024                if let Some(pool) = &self.client.standalone_pool {
2025                    let _result = pool.execute_command(cmd_name, cmd_args).await?;
2026                    Ok(())
2027                } else {
2028                    Err(RedisError::Connection(
2029                        "No standalone pool available".to_string(),
2030                    ))
2031                }
2032            }
2033            TopologyType::Cluster => {
2034                // For cluster, use the node determined by the command's key
2035                if let Some(key) = command.key() {
2036                    let slot = calculate_slot(key.as_bytes());
2037                    let node_addr = self.get_node_for_slot(slot).await?;
2038                    let pool = self.get_or_create_pool(&node_addr).await?;
2039                    let _result = pool.execute_command(cmd_name, cmd_args).await?;
2040                    Ok(())
2041                } else {
2042                    // If no key, use any available node
2043                    let pools = self.client.cluster_pools.read().await;
2044                    if let Some((_, pool)) = pools.iter().next() {
2045                        let _result = pool.execute_command(cmd_name, cmd_args).await?;
2046                        Ok(())
2047                    } else {
2048                        Err(RedisError::Cluster(
2049                            "No cluster nodes available".to_string(),
2050                        ))
2051                    }
2052                }
2053            }
2054        }
2055    }
2056
2057    async fn exec(&mut self) -> RedisResult<Vec<RespValue>> {
2058        // Execute EXEC command
2059        match self.client.topology_type {
2060            TopologyType::Standalone => {
2061                if let Some(pool) = &self.client.standalone_pool {
2062                    let result = pool.execute_command("EXEC".to_string(), vec![]).await?;
2063                    match result {
2064                        RespValue::Array(results) => Ok(results),
2065                        RespValue::Null => Ok(vec![]), // Transaction was discarded (watched key changed)
2066                        _ => Err(RedisError::Type(format!(
2067                            "Unexpected EXEC response: {:?}",
2068                            result
2069                        ))),
2070                    }
2071                } else {
2072                    Err(RedisError::Connection(
2073                        "No standalone pool available".to_string(),
2074                    ))
2075                }
2076            }
2077            TopologyType::Cluster => {
2078                let pools = self.client.cluster_pools.read().await;
2079                if let Some((_, pool)) = pools.iter().next() {
2080                    let result = pool.execute_command("EXEC".to_string(), vec![]).await?;
2081                    match result {
2082                        RespValue::Array(results) => Ok(results),
2083                        RespValue::Null => Ok(vec![]), // Transaction was discarded (watched key changed)
2084                        _ => Err(RedisError::Type(format!(
2085                            "Unexpected EXEC response: {:?}",
2086                            result
2087                        ))),
2088                    }
2089                } else {
2090                    Err(RedisError::Cluster(
2091                        "No cluster nodes available".to_string(),
2092                    ))
2093                }
2094            }
2095        }
2096    }
2097
2098    async fn discard(&mut self) -> RedisResult<()> {
2099        // Execute DISCARD command
2100        match self.client.topology_type {
2101            TopologyType::Standalone => {
2102                if let Some(pool) = &self.client.standalone_pool {
2103                    let _result = pool.execute_command("DISCARD".to_string(), vec![]).await?;
2104                    Ok(())
2105                } else {
2106                    Err(RedisError::Connection(
2107                        "No standalone pool available".to_string(),
2108                    ))
2109                }
2110            }
2111            TopologyType::Cluster => {
2112                let pools = self.client.cluster_pools.read().await;
2113                if let Some((_, pool)) = pools.iter().next() {
2114                    let _result = pool.execute_command("DISCARD".to_string(), vec![]).await?;
2115                    Ok(())
2116                } else {
2117                    Err(RedisError::Cluster(
2118                        "No cluster nodes available".to_string(),
2119                    ))
2120                }
2121            }
2122        }
2123    }
2124
2125    async fn watch(&mut self, keys: Vec<String>) -> RedisResult<()> {
2126        // Execute WATCH command
2127        let mut args = vec![];
2128        for key in keys {
2129            args.push(RespValue::from(key));
2130        }
2131
2132        match self.client.topology_type {
2133            TopologyType::Standalone => {
2134                if let Some(pool) = &self.client.standalone_pool {
2135                    let _result = pool.execute_command("WATCH".to_string(), args).await?;
2136                    Ok(())
2137                } else {
2138                    Err(RedisError::Connection(
2139                        "No standalone pool available".to_string(),
2140                    ))
2141                }
2142            }
2143            TopologyType::Cluster => {
2144                let pools = self.client.cluster_pools.read().await;
2145                if let Some((_, pool)) = pools.iter().next() {
2146                    let _result = pool.execute_command("WATCH".to_string(), args).await?;
2147                    Ok(())
2148                } else {
2149                    Err(RedisError::Cluster(
2150                        "No cluster nodes available".to_string(),
2151                    ))
2152                }
2153            }
2154        }
2155    }
2156
2157    async fn unwatch(&mut self) -> RedisResult<()> {
2158        // Execute UNWATCH command
2159        match self.client.topology_type {
2160            TopologyType::Standalone => {
2161                if let Some(pool) = &self.client.standalone_pool {
2162                    let _result = pool.execute_command("UNWATCH".to_string(), vec![]).await?;
2163                    Ok(())
2164                } else {
2165                    Err(RedisError::Connection(
2166                        "No standalone pool available".to_string(),
2167                    ))
2168                }
2169            }
2170            TopologyType::Cluster => {
2171                let pools = self.client.cluster_pools.read().await;
2172                if let Some((_, pool)) = pools.iter().next() {
2173                    let _result = pool.execute_command("UNWATCH".to_string(), vec![]).await?;
2174                    Ok(())
2175                } else {
2176                    Err(RedisError::Cluster(
2177                        "No cluster nodes available".to_string(),
2178                    ))
2179                }
2180            }
2181        }
2182    }
2183}
2184
2185impl ClientTransactionExecutor {
2186    /// Get the node address for a given slot (cluster mode)
2187    async fn get_node_for_slot(&self, slot: u16) -> RedisResult<String> {
2188        if let Some(topology) = &self.client.cluster_topology {
2189            if let Some((host, port)) = topology.get_node_for_slot(slot).await {
2190                Ok(format!("{}:{}", host, port))
2191            } else {
2192                Err(RedisError::Cluster(format!(
2193                    "No node found for slot {}",
2194                    slot
2195                )))
2196            }
2197        } else {
2198            Err(RedisError::Cluster(
2199                "No cluster topology available".to_string(),
2200            ))
2201        }
2202    }
2203
2204    /// Get or create a pool for the given node address
2205    async fn get_or_create_pool(&self, node_addr: &str) -> RedisResult<Arc<Pool>> {
2206        let pools = self.client.cluster_pools.read().await;
2207        if let Some(pool) = pools.get(node_addr) {
2208            Ok(pool.clone())
2209        } else {
2210            drop(pools);
2211
2212            // Create new pool for this node
2213            let mut pools = self.client.cluster_pools.write().await;
2214
2215            // Double-check after acquiring write lock
2216            if let Some(pool) = pools.get(node_addr) {
2217                return Ok(pool.clone());
2218            }
2219
2220            // Parse node address
2221            let parts: Vec<&str> = node_addr.split(':').collect();
2222            if parts.len() != 2 {
2223                return Err(RedisError::Config(format!(
2224                    "Invalid node address: {}",
2225                    node_addr
2226                )));
2227            }
2228
2229            let host = parts[0];
2230            let port: u16 = parts[1].parse().map_err(|_| {
2231                RedisError::Config(format!("Invalid port in address: {}", node_addr))
2232            })?;
2233
2234            // Create config for this node
2235            let node_config = self.client.config.clone();
2236
2237            let pool = Arc::new(Pool::new(node_config, host.to_string(), port).await?);
2238            pools.insert(node_addr.to_string(), pool.clone());
2239
2240            Ok(pool)
2241        }
2242    }
2243}
2244
2245/// Pub/Sub connection implementation for Client
2246struct ClientPubSubConnection {
2247    client: Client,
2248}
2249
2250#[async_trait::async_trait]
2251impl PubSubConnection for ClientPubSubConnection {
2252    async fn subscribe(&mut self, channels: Vec<String>) -> RedisResult<()> {
2253        let mut args = vec![];
2254        for channel in channels {
2255            args.push(RespValue::from(channel));
2256        }
2257
2258        match self.client.topology_type {
2259            TopologyType::Standalone => {
2260                if let Some(pool) = &self.client.standalone_pool {
2261                    let _result = pool.execute_command("SUBSCRIBE".to_string(), args).await?;
2262                    Ok(())
2263                } else {
2264                    Err(RedisError::Connection(
2265                        "No standalone pool available".to_string(),
2266                    ))
2267                }
2268            }
2269            TopologyType::Cluster => {
2270                let pools = self.client.cluster_pools.read().await;
2271                if let Some((_, pool)) = pools.iter().next() {
2272                    let _result = pool.execute_command("SUBSCRIBE".to_string(), args).await?;
2273                    Ok(())
2274                } else {
2275                    Err(RedisError::Cluster(
2276                        "No cluster nodes available".to_string(),
2277                    ))
2278                }
2279            }
2280        }
2281    }
2282
2283    async fn unsubscribe(&mut self, channels: Vec<String>) -> RedisResult<()> {
2284        let mut args = vec![];
2285        for channel in channels {
2286            args.push(RespValue::from(channel));
2287        }
2288
2289        match self.client.topology_type {
2290            TopologyType::Standalone => {
2291                if let Some(pool) = &self.client.standalone_pool {
2292                    let _result = pool
2293                        .execute_command("UNSUBSCRIBE".to_string(), args)
2294                        .await?;
2295                    Ok(())
2296                } else {
2297                    Err(RedisError::Connection(
2298                        "No standalone pool available".to_string(),
2299                    ))
2300                }
2301            }
2302            TopologyType::Cluster => {
2303                let pools = self.client.cluster_pools.read().await;
2304                if let Some((_, pool)) = pools.iter().next() {
2305                    let _result = pool
2306                        .execute_command("UNSUBSCRIBE".to_string(), args)
2307                        .await?;
2308                    Ok(())
2309                } else {
2310                    Err(RedisError::Cluster(
2311                        "No cluster nodes available".to_string(),
2312                    ))
2313                }
2314            }
2315        }
2316    }
2317
2318    async fn psubscribe(&mut self, patterns: Vec<String>) -> RedisResult<()> {
2319        let mut args = vec![];
2320        for pattern in patterns {
2321            args.push(RespValue::from(pattern));
2322        }
2323
2324        match self.client.topology_type {
2325            TopologyType::Standalone => {
2326                if let Some(pool) = &self.client.standalone_pool {
2327                    let _result = pool.execute_command("PSUBSCRIBE".to_string(), args).await?;
2328                    Ok(())
2329                } else {
2330                    Err(RedisError::Connection(
2331                        "No standalone pool available".to_string(),
2332                    ))
2333                }
2334            }
2335            TopologyType::Cluster => {
2336                let pools = self.client.cluster_pools.read().await;
2337                if let Some((_, pool)) = pools.iter().next() {
2338                    let _result = pool.execute_command("PSUBSCRIBE".to_string(), args).await?;
2339                    Ok(())
2340                } else {
2341                    Err(RedisError::Cluster(
2342                        "No cluster nodes available".to_string(),
2343                    ))
2344                }
2345            }
2346        }
2347    }
2348
2349    async fn punsubscribe(&mut self, patterns: Vec<String>) -> RedisResult<()> {
2350        let mut args = vec![];
2351        for pattern in patterns {
2352            args.push(RespValue::from(pattern));
2353        }
2354
2355        match self.client.topology_type {
2356            TopologyType::Standalone => {
2357                if let Some(pool) = &self.client.standalone_pool {
2358                    let _result = pool
2359                        .execute_command("PUNSUBSCRIBE".to_string(), args)
2360                        .await?;
2361                    Ok(())
2362                } else {
2363                    Err(RedisError::Connection(
2364                        "No standalone pool available".to_string(),
2365                    ))
2366                }
2367            }
2368            TopologyType::Cluster => {
2369                let pools = self.client.cluster_pools.read().await;
2370                if let Some((_, pool)) = pools.iter().next() {
2371                    let _result = pool
2372                        .execute_command("PUNSUBSCRIBE".to_string(), args)
2373                        .await?;
2374                    Ok(())
2375                } else {
2376                    Err(RedisError::Cluster(
2377                        "No cluster nodes available".to_string(),
2378                    ))
2379                }
2380            }
2381        }
2382    }
2383
2384    async fn listen(
2385        &mut self,
2386        message_tx: tokio::sync::mpsc::UnboundedSender<crate::pubsub::PubSubMessage>,
2387    ) -> RedisResult<()> {
2388        // This is a simplified implementation
2389        // In a real implementation, this would maintain a persistent connection
2390        // and continuously listen for pub/sub messages
2391
2392        // For now, we'll just return Ok to satisfy the trait
2393        // A full implementation would require a dedicated connection for pub/sub
2394        // that stays open and continuously reads messages
2395
2396        // TODO: Implement proper pub/sub message listening
2397        // This would involve:
2398        // 1. Creating a dedicated connection for pub/sub
2399        // 2. Continuously reading RESP messages
2400        // 3. Parsing pub/sub messages and sending them through message_tx
2401
2402        drop(message_tx); // Avoid unused variable warning
2403        Ok(())
2404    }
2405
2406    async fn publish(&mut self, channel: String, message: String) -> RedisResult<i64> {
2407        let args = vec![RespValue::from(channel), RespValue::from(message)];
2408
2409        match self.client.topology_type {
2410            TopologyType::Standalone => {
2411                if let Some(pool) = &self.client.standalone_pool {
2412                    let result = pool.execute_command("PUBLISH".to_string(), args).await?;
2413                    result.as_int()
2414                } else {
2415                    Err(RedisError::Connection(
2416                        "No standalone pool available".to_string(),
2417                    ))
2418                }
2419            }
2420            TopologyType::Cluster => {
2421                let pools = self.client.cluster_pools.read().await;
2422                if let Some((_, pool)) = pools.iter().next() {
2423                    let result = pool.execute_command("PUBLISH".to_string(), args).await?;
2424                    result.as_int()
2425                } else {
2426                    Err(RedisError::Cluster(
2427                        "No cluster nodes available".to_string(),
2428                    ))
2429                }
2430            }
2431        }
2432    }
2433}
2434
2435#[cfg(test)]
2436mod tests {
2437    use super::*;
2438
2439    #[test]
2440    fn test_client_configuration() {
2441        let config = ConnectionConfig::new("redis://localhost:6379");
2442        assert!(!config.connection_string.is_empty());
2443    }
2444}