redis_oxide/
connection.rs1use crate::core::{
7 config::{ConnectionConfig, TopologyMode},
8 error::{RedisError, RedisResult},
9 value::RespValue,
10};
11use crate::protocol::{RespDecoder, RespEncoder};
12use bytes::{Buf, BytesMut};
13use std::io::Cursor;
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15use tokio::net::TcpStream;
16use tokio::time::timeout;
17use tracing::{debug, info, warn};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum TopologyType {
22 Standalone,
24 Cluster,
26}
27
28pub struct RedisConnection {
30 stream: TcpStream,
31 read_buffer: BytesMut,
32 config: ConnectionConfig,
33}
34
35impl RedisConnection {
36 pub async fn connect(host: &str, port: u16, config: ConnectionConfig) -> RedisResult<Self> {
38 let addr = format!("{}:{}", host, port);
39 debug!("Connecting to Redis at {}", addr);
40
41 let stream = timeout(config.connect_timeout, TcpStream::connect(&addr))
42 .await
43 .map_err(|_| RedisError::Timeout)?
44 .map_err(|e| RedisError::Connection(format!("Failed to connect to {}: {}", addr, e)))?;
45
46 if let Some(keepalive_duration) = config.tcp_keepalive {
48 let socket = socket2::Socket::from(stream.into_std()?);
49 let keepalive = socket2::TcpKeepalive::new().with_time(keepalive_duration);
50 socket.set_tcp_keepalive(&keepalive).map_err(|e| {
51 RedisError::Connection(format!("Failed to set TCP keepalive: {}", e))
52 })?;
53 let stream = TcpStream::from_std(socket.into())?;
54
55 let mut conn = Self {
56 stream,
57 read_buffer: BytesMut::with_capacity(8192),
58 config: config.clone(),
59 };
60
61 if let Some(ref password) = config.password {
63 conn.authenticate(password).await?;
64 }
65
66 Ok(conn)
67 } else {
68 let mut conn = Self {
69 stream,
70 read_buffer: BytesMut::with_capacity(8192),
71 config: config.clone(),
72 };
73
74 if let Some(ref password) = config.password {
76 conn.authenticate(password).await?;
77 }
78
79 Ok(conn)
80 }
81 }
82
83 async fn authenticate(&mut self, password: &str) -> RedisResult<()> {
85 debug!("Authenticating with Redis server");
86 let response = self
87 .execute_command("AUTH", &[RespValue::from(password)])
88 .await?;
89
90 match response {
91 RespValue::SimpleString(ref s) if s == "OK" => Ok(()),
92 RespValue::Error(e) => Err(RedisError::Auth(e)),
93 _ => Err(RedisError::Auth(
94 "Unexpected authentication response".to_string(),
95 )),
96 }
97 }
98
99 pub async fn execute_command(
101 &mut self,
102 command: &str,
103 args: &[RespValue],
104 ) -> RedisResult<RespValue> {
105 let encoded = RespEncoder::encode_command(command, args)?;
107
108 timeout(
110 self.config.operation_timeout,
111 self.stream.write_all(&encoded),
112 )
113 .await
114 .map_err(|_| RedisError::Timeout)?
115 .map_err(RedisError::Io)?;
116
117 let response = timeout(self.config.operation_timeout, self.read_response())
119 .await
120 .map_err(|_| RedisError::Timeout)??;
121
122 if let RespValue::Error(ref msg) = response {
124 if let Some(redirect_error) = RedisError::parse_redirect(msg) {
125 return Err(redirect_error);
126 }
127 return Err(RedisError::Server(msg.clone()));
128 }
129
130 Ok(response)
131 }
132
133 async fn read_response(&mut self) -> RedisResult<RespValue> {
135 loop {
136 let mut cursor = Cursor::new(&self.read_buffer[..]);
138 if let Some(value) = RespDecoder::decode(&mut cursor)? {
139 let pos = cursor.position() as usize;
140 self.read_buffer.advance(pos);
141 return Ok(value);
142 }
143
144 let n = self.stream.read_buf(&mut self.read_buffer).await?;
146 if n == 0 {
147 return Err(RedisError::Connection(
148 "Connection closed by server".to_string(),
149 ));
150 }
151 }
152 }
153
154 pub async fn detect_topology(&mut self) -> RedisResult<TopologyType> {
156 info!("Detecting Redis topology");
157
158 match self
160 .execute_command("CLUSTER", &[RespValue::from("INFO")])
161 .await
162 {
163 Ok(RespValue::BulkString(data)) => {
164 let info_str = String::from_utf8(data.to_vec())
165 .map_err(|e| RedisError::Protocol(format!("Invalid UTF-8: {}", e)))?;
166
167 if info_str.contains("cluster_enabled:1") || info_str.contains("cluster_state:ok") {
169 info!("Detected Redis Cluster");
170 return Ok(TopologyType::Cluster);
171 }
172 }
173 Ok(RespValue::SimpleString(info_str)) => {
174 if info_str.contains("cluster_enabled:1") || info_str.contains("cluster_state:ok") {
176 info!("Detected Redis Cluster");
177 return Ok(TopologyType::Cluster);
178 }
179 }
180 Ok(RespValue::Error(ref e))
181 if e.contains("command not supported")
182 || e.contains("unknown command")
183 || e.contains("disabled") =>
184 {
185 info!("Detected Standalone Redis (CLUSTER command not available)");
187 return Ok(TopologyType::Standalone);
188 }
189 Err(RedisError::Server(ref e))
190 if e.contains("command not supported")
191 || e.contains("unknown command")
192 || e.contains("disabled") =>
193 {
194 info!("Detected Standalone Redis (CLUSTER command not available)");
195 return Ok(TopologyType::Standalone);
196 }
197 Err(e) => {
198 warn!("Error detecting topology: {:?}, assuming standalone", e);
199 return Ok(TopologyType::Standalone);
200 }
201 _ => {}
202 }
203
204 info!("Detected Standalone Redis");
205 Ok(TopologyType::Standalone)
206 }
207
208 pub async fn select_database(&mut self, db: u8) -> RedisResult<()> {
210 let response = self
211 .execute_command("SELECT", &[RespValue::from(db as i64)])
212 .await?;
213
214 match response {
215 RespValue::SimpleString(ref s) if s == "OK" => Ok(()),
216 RespValue::Error(e) => Err(RedisError::Server(e)),
217 _ => Err(RedisError::UnexpectedResponse(format!("{:?}", response))),
218 }
219 }
220}
221
222pub struct ConnectionManager {
224 config: ConnectionConfig,
225 topology: Option<TopologyType>,
226}
227
228impl ConnectionManager {
229 pub fn new(config: ConnectionConfig) -> Self {
231 Self {
232 config,
233 topology: None,
234 }
235 }
236
237 pub async fn get_topology(&mut self) -> RedisResult<TopologyType> {
239 if let Some(topology) = self.topology {
240 return Ok(topology);
241 }
242
243 match self.config.topology_mode {
245 TopologyMode::Standalone => {
246 self.topology = Some(TopologyType::Standalone);
247 Ok(TopologyType::Standalone)
248 }
249 TopologyMode::Cluster => {
250 self.topology = Some(TopologyType::Cluster);
251 Ok(TopologyType::Cluster)
252 }
253 TopologyMode::Auto => {
254 let endpoints = self.config.parse_endpoints();
256 if endpoints.is_empty() {
257 return Err(RedisError::Config("No endpoints specified".to_string()));
258 }
259
260 let (host, port) = &endpoints[0];
261 let mut conn = RedisConnection::connect(host, *port, self.config.clone()).await?;
262 let topology = conn.detect_topology().await?;
263 self.topology = Some(topology);
264 Ok(topology)
265 }
266 }
267 }
268
269 pub async fn create_connection(&self, host: &str, port: u16) -> RedisResult<RedisConnection> {
271 RedisConnection::connect(host, port, self.config.clone()).await
272 }
273
274 pub fn config(&self) -> &ConnectionConfig {
276 &self.config
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283
284 #[test]
285 fn test_connection_manager_creation() {
286 let config = ConnectionConfig::new("redis://localhost:6379");
287 let manager = ConnectionManager::new(config);
288 assert!(manager.topology.is_none());
289 }
290
291 #[test]
292 fn test_forced_topology() {
293 let config = ConnectionConfig::new("redis://localhost:6379")
294 .with_topology_mode(TopologyMode::Standalone);
295 let manager = ConnectionManager::new(config);
296
297 assert_eq!(manager.config.topology_mode, TopologyMode::Standalone);
299 }
300}