redis_oxide/
connection.rs

1//! Connection management and topology detection
2//!
3//! This module handles low-level TCP connections to Redis servers,
4//! automatic topology detection, and connection lifecycle management.
5
6use crate::core::{
7    config::{ConnectionConfig, TopologyMode},
8    error::{RedisError, RedisResult},
9    value::RespValue,
10};
11use crate::protocol::{RespDecoder, RespEncoder};
12use bytes::{Buf, BytesMut};
13use std::io::Cursor;
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15use tokio::net::TcpStream;
16use tokio::time::timeout;
17use tracing::{debug, info, warn};
18
19/// Type of Redis topology
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum TopologyType {
22    /// Standalone Redis server
23    Standalone,
24    /// Redis Cluster
25    Cluster,
26}
27
28/// A connection to a Redis server
29pub struct RedisConnection {
30    stream: TcpStream,
31    read_buffer: BytesMut,
32    config: ConnectionConfig,
33}
34
35impl RedisConnection {
36    /// Connect to a Redis server
37    pub async fn connect(host: &str, port: u16, config: ConnectionConfig) -> RedisResult<Self> {
38        let addr = format!("{}:{}", host, port);
39        debug!("Connecting to Redis at {}", addr);
40
41        let stream = timeout(config.connect_timeout, TcpStream::connect(&addr))
42            .await
43            .map_err(|_| RedisError::Timeout)?
44            .map_err(|e| RedisError::Connection(format!("Failed to connect to {}: {}", addr, e)))?;
45
46        // Set TCP keepalive if configured
47        if let Some(keepalive_duration) = config.tcp_keepalive {
48            let socket = socket2::Socket::from(stream.into_std()?);
49            let keepalive = socket2::TcpKeepalive::new().with_time(keepalive_duration);
50            socket.set_tcp_keepalive(&keepalive).map_err(|e| {
51                RedisError::Connection(format!("Failed to set TCP keepalive: {}", e))
52            })?;
53            let stream = TcpStream::from_std(socket.into())?;
54
55            let mut conn = Self {
56                stream,
57                read_buffer: BytesMut::with_capacity(8192),
58                config: config.clone(),
59            };
60
61            // Authenticate if password is provided
62            if let Some(ref password) = config.password {
63                conn.authenticate(password).await?;
64            }
65
66            Ok(conn)
67        } else {
68            let mut conn = Self {
69                stream,
70                read_buffer: BytesMut::with_capacity(8192),
71                config: config.clone(),
72            };
73
74            // Authenticate if password is provided
75            if let Some(ref password) = config.password {
76                conn.authenticate(password).await?;
77            }
78
79            Ok(conn)
80        }
81    }
82
83    /// Authenticate with the Redis server
84    async fn authenticate(&mut self, password: &str) -> RedisResult<()> {
85        debug!("Authenticating with Redis server");
86        let response = self
87            .execute_command("AUTH", &[RespValue::from(password)])
88            .await?;
89
90        match response {
91            RespValue::SimpleString(ref s) if s == "OK" => Ok(()),
92            RespValue::Error(e) => Err(RedisError::Auth(e)),
93            _ => Err(RedisError::Auth(
94                "Unexpected authentication response".to_string(),
95            )),
96        }
97    }
98
99    /// Execute a command and return the response
100    pub async fn execute_command(
101        &mut self,
102        command: &str,
103        args: &[RespValue],
104    ) -> RedisResult<RespValue> {
105        // Encode command
106        let encoded = RespEncoder::encode_command(command, args)?;
107
108        // Send command with timeout
109        timeout(
110            self.config.operation_timeout,
111            self.stream.write_all(&encoded),
112        )
113        .await
114        .map_err(|_| RedisError::Timeout)?
115        .map_err(RedisError::Io)?;
116
117        // Read response with timeout
118        let response = timeout(self.config.operation_timeout, self.read_response())
119            .await
120            .map_err(|_| RedisError::Timeout)??;
121
122        // Check if response is an error and parse redirects
123        if let RespValue::Error(ref msg) = response {
124            if let Some(redirect_error) = RedisError::parse_redirect(msg) {
125                return Err(redirect_error);
126            }
127            return Err(RedisError::Server(msg.clone()));
128        }
129
130        Ok(response)
131    }
132
133    /// Read a complete RESP response from the connection
134    async fn read_response(&mut self) -> RedisResult<RespValue> {
135        loop {
136            // Try to decode from existing buffer
137            let mut cursor = Cursor::new(&self.read_buffer[..]);
138            if let Some(value) = RespDecoder::decode(&mut cursor)? {
139                let pos = cursor.position() as usize;
140                self.read_buffer.advance(pos);
141                return Ok(value);
142            }
143
144            // Need more data - read from socket
145            let n = self.stream.read_buf(&mut self.read_buffer).await?;
146            if n == 0 {
147                return Err(RedisError::Connection(
148                    "Connection closed by server".to_string(),
149                ));
150            }
151        }
152    }
153
154    /// Detect the topology type of the Redis server
155    pub async fn detect_topology(&mut self) -> RedisResult<TopologyType> {
156        info!("Detecting Redis topology");
157
158        // Try CLUSTER INFO command
159        match self
160            .execute_command("CLUSTER", &[RespValue::from("INFO")])
161            .await
162        {
163            Ok(RespValue::BulkString(data)) => {
164                let info_str = String::from_utf8(data.to_vec())
165                    .map_err(|e| RedisError::Protocol(format!("Invalid UTF-8: {}", e)))?;
166
167                // Parse cluster_state
168                if info_str.contains("cluster_enabled:1") || info_str.contains("cluster_state:ok") {
169                    info!("Detected Redis Cluster");
170                    return Ok(TopologyType::Cluster);
171                }
172            }
173            Ok(RespValue::SimpleString(info_str)) => {
174                // Parse cluster_state
175                if info_str.contains("cluster_enabled:1") || info_str.contains("cluster_state:ok") {
176                    info!("Detected Redis Cluster");
177                    return Ok(TopologyType::Cluster);
178                }
179            }
180            Ok(RespValue::Error(ref e))
181                if e.contains("command not supported")
182                    || e.contains("unknown command")
183                    || e.contains("disabled") =>
184            {
185                // Cluster commands not available - this is standalone
186                info!("Detected Standalone Redis (CLUSTER command not available)");
187                return Ok(TopologyType::Standalone);
188            }
189            Err(RedisError::Server(ref e))
190                if e.contains("command not supported")
191                    || e.contains("unknown command")
192                    || e.contains("disabled") =>
193            {
194                info!("Detected Standalone Redis (CLUSTER command not available)");
195                return Ok(TopologyType::Standalone);
196            }
197            Err(e) => {
198                warn!("Error detecting topology: {:?}, assuming standalone", e);
199                return Ok(TopologyType::Standalone);
200            }
201            _ => {}
202        }
203
204        info!("Detected Standalone Redis");
205        Ok(TopologyType::Standalone)
206    }
207
208    /// Select a database (only works in standalone mode)
209    pub async fn select_database(&mut self, db: u8) -> RedisResult<()> {
210        let response = self
211            .execute_command("SELECT", &[RespValue::from(db as i64)])
212            .await?;
213
214        match response {
215            RespValue::SimpleString(ref s) if s == "OK" => Ok(()),
216            RespValue::Error(e) => Err(RedisError::Server(e)),
217            _ => Err(RedisError::UnexpectedResponse(format!("{:?}", response))),
218        }
219    }
220}
221
222/// Connection manager that handles topology detection and connection creation
223pub struct ConnectionManager {
224    config: ConnectionConfig,
225    topology: Option<TopologyType>,
226}
227
228impl ConnectionManager {
229    /// Create a new connection manager
230    pub fn new(config: ConnectionConfig) -> Self {
231        Self {
232            config,
233            topology: None,
234        }
235    }
236
237    /// Get or detect the topology type
238    pub async fn get_topology(&mut self) -> RedisResult<TopologyType> {
239        if let Some(topology) = self.topology {
240            return Ok(topology);
241        }
242
243        // Check if topology mode is forced
244        match self.config.topology_mode {
245            TopologyMode::Standalone => {
246                self.topology = Some(TopologyType::Standalone);
247                Ok(TopologyType::Standalone)
248            }
249            TopologyMode::Cluster => {
250                self.topology = Some(TopologyType::Cluster);
251                Ok(TopologyType::Cluster)
252            }
253            TopologyMode::Auto => {
254                // Auto-detect
255                let endpoints = self.config.parse_endpoints();
256                if endpoints.is_empty() {
257                    return Err(RedisError::Config("No endpoints specified".to_string()));
258                }
259
260                let (host, port) = &endpoints[0];
261                let mut conn = RedisConnection::connect(host, *port, self.config.clone()).await?;
262                let topology = conn.detect_topology().await?;
263                self.topology = Some(topology);
264                Ok(topology)
265            }
266        }
267    }
268
269    /// Create a new connection to the specified host and port
270    pub async fn create_connection(&self, host: &str, port: u16) -> RedisResult<RedisConnection> {
271        RedisConnection::connect(host, port, self.config.clone()).await
272    }
273
274    /// Get the configuration
275    pub fn config(&self) -> &ConnectionConfig {
276        &self.config
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    #[test]
285    fn test_connection_manager_creation() {
286        let config = ConnectionConfig::new("redis://localhost:6379");
287        let manager = ConnectionManager::new(config);
288        assert!(manager.topology.is_none());
289    }
290
291    #[test]
292    fn test_forced_topology() {
293        let config = ConnectionConfig::new("redis://localhost:6379")
294            .with_topology_mode(TopologyMode::Standalone);
295        let manager = ConnectionManager::new(config);
296
297        // This would normally require async, but we can test the logic
298        assert_eq!(manager.config.topology_mode, TopologyMode::Standalone);
299    }
300}