redis_oxide/protocol/
mod.rs

1//! Redis protocol implementations
2//!
3//! This module contains implementations for both RESP2 and RESP3 protocols,
4//! providing encoding and decoding functionality for Redis communication.
5
6pub mod resp2;
7pub mod resp2_optimized;
8pub mod resp3;
9
10// Re-export the existing protocol functionality
11pub use resp2::{RespDecoder, RespEncoder};
12pub use resp2_optimized::{OptimizedRespDecoder, OptimizedRespEncoder};
13pub use resp3::{Resp3Decoder, Resp3Encoder, Resp3Value};
14
15/// Protocol version enumeration
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum ProtocolVersion {
18    /// RESP2 (Redis Serialization Protocol version 2)
19    Resp2,
20    /// RESP3 (Redis Serialization Protocol version 3)
21    Resp3,
22}
23
24impl Default for ProtocolVersion {
25    fn default() -> Self {
26        Self::Resp2
27    }
28}
29
30impl std::fmt::Display for ProtocolVersion {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            Self::Resp2 => write!(f, "RESP2"),
34            Self::Resp3 => write!(f, "RESP3"),
35        }
36    }
37}
38
39/// Protocol negotiation result
40#[derive(Debug, Clone)]
41pub struct ProtocolNegotiation {
42    /// The negotiated protocol version
43    pub version: ProtocolVersion,
44    /// Server capabilities (for RESP3)
45    pub capabilities: Vec<String>,
46}
47
48impl ProtocolNegotiation {
49    /// Create a new protocol negotiation result
50    #[must_use]
51    pub const fn new(version: ProtocolVersion) -> Self {
52        Self {
53            version,
54            capabilities: Vec::new(),
55        }
56    }
57
58    /// Create a RESP3 negotiation with capabilities
59    #[must_use]
60    pub fn resp3_with_capabilities(capabilities: Vec<String>) -> Self {
61        Self {
62            version: ProtocolVersion::Resp3,
63            capabilities,
64        }
65    }
66
67    /// Check if a capability is supported
68    #[must_use]
69    pub fn has_capability(&self, capability: &str) -> bool {
70        self.capabilities.iter().any(|c| c == capability)
71    }
72}
73
74/// Protocol negotiator for handling RESP2/RESP3 protocol selection
75pub struct ProtocolNegotiator {
76    preferred_version: ProtocolVersion,
77}
78
79impl ProtocolNegotiator {
80    /// Create a new protocol negotiator
81    #[must_use]
82    pub const fn new(preferred_version: ProtocolVersion) -> Self {
83        Self { preferred_version }
84    }
85
86    /// Negotiate protocol version with the server
87    ///
88    /// This method attempts to negotiate the preferred protocol version.
89    /// If RESP3 is preferred, it sends a HELLO command to negotiate.
90    /// Falls back to RESP2 if negotiation fails.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if protocol negotiation fails completely.
95    pub async fn negotiate<T>(
96        &self,
97        connection: &mut T,
98    ) -> crate::core::error::RedisResult<ProtocolNegotiation>
99    where
100        T: ProtocolConnection,
101    {
102        match self.preferred_version {
103            ProtocolVersion::Resp2 => Ok(ProtocolNegotiation::new(ProtocolVersion::Resp2)),
104            ProtocolVersion::Resp3 => {
105                // Try to negotiate RESP3
106                match self.try_negotiate_resp3(connection).await {
107                    Ok(negotiation) => Ok(negotiation),
108                    Err(_) => {
109                        // Fall back to RESP2
110                        Ok(ProtocolNegotiation::new(ProtocolVersion::Resp2))
111                    }
112                }
113            }
114        }
115    }
116
117    async fn try_negotiate_resp3<T>(
118        &self,
119        connection: &mut T,
120    ) -> crate::core::error::RedisResult<ProtocolNegotiation>
121    where
122        T: ProtocolConnection,
123    {
124        // Send HELLO 3 command to negotiate RESP3
125        let hello_cmd = crate::core::value::RespValue::Array(vec![
126            crate::core::value::RespValue::BulkString(bytes::Bytes::from("HELLO")),
127            crate::core::value::RespValue::BulkString(bytes::Bytes::from("3")),
128        ]);
129
130        connection.send_command(&hello_cmd).await?;
131        let response = connection.read_response().await?;
132
133        // Parse HELLO response to extract capabilities
134        match response {
135            crate::core::value::RespValue::Array(items) => {
136                let mut capabilities = Vec::new();
137
138                // HELLO response format: [server, version, proto, capabilities...]
139                if items.len() >= 4 {
140                    // Extract capabilities from the response
141                    for item in items.iter().skip(3) {
142                        if let crate::core::value::RespValue::BulkString(cap) = item {
143                            if let Ok(cap_str) = String::from_utf8(cap.to_vec()) {
144                                capabilities.push(cap_str);
145                            }
146                        }
147                    }
148                }
149
150                Ok(ProtocolNegotiation::resp3_with_capabilities(capabilities))
151            }
152            _ => Err(crate::core::error::RedisError::Protocol(
153                "Invalid HELLO response".to_string(),
154            )),
155        }
156    }
157}
158
159impl Default for ProtocolNegotiator {
160    fn default() -> Self {
161        Self::new(ProtocolVersion::Resp2)
162    }
163}
164
165/// Trait for connections that support protocol negotiation
166#[async_trait::async_trait]
167pub trait ProtocolConnection {
168    /// Send a command to the server
169    async fn send_command(
170        &mut self,
171        command: &crate::core::value::RespValue,
172    ) -> crate::core::error::RedisResult<()>;
173
174    /// Read a response from the server
175    async fn read_response(
176        &mut self,
177    ) -> crate::core::error::RedisResult<crate::core::value::RespValue>;
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    #[test]
185    fn test_protocol_version_display() {
186        assert_eq!(ProtocolVersion::Resp2.to_string(), "RESP2");
187        assert_eq!(ProtocolVersion::Resp3.to_string(), "RESP3");
188    }
189
190    #[test]
191    fn test_protocol_negotiation() {
192        let negotiation = ProtocolNegotiation::new(ProtocolVersion::Resp2);
193        assert_eq!(negotiation.version, ProtocolVersion::Resp2);
194        assert!(negotiation.capabilities.is_empty());
195
196        let negotiation = ProtocolNegotiation::resp3_with_capabilities(vec![
197            "push".to_string(),
198            "streams".to_string(),
199        ]);
200        assert_eq!(negotiation.version, ProtocolVersion::Resp3);
201        assert!(negotiation.has_capability("push"));
202        assert!(negotiation.has_capability("streams"));
203        assert!(!negotiation.has_capability("unknown"));
204    }
205
206    #[test]
207    fn test_protocol_negotiator() {
208        let negotiator = ProtocolNegotiator::new(ProtocolVersion::Resp3);
209        assert_eq!(negotiator.preferred_version, ProtocolVersion::Resp3);
210
211        let negotiator = ProtocolNegotiator::default();
212        assert_eq!(negotiator.preferred_version, ProtocolVersion::Resp2);
213    }
214}