redis_oxide/protocol/
mod.rs1pub mod resp2;
7pub mod resp2_optimized;
8pub mod resp3;
9
10pub use resp2::{RespDecoder, RespEncoder};
12pub use resp2_optimized::{OptimizedRespDecoder, OptimizedRespEncoder};
13pub use resp3::{Resp3Decoder, Resp3Encoder, Resp3Value};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum ProtocolVersion {
18 Resp2,
20 Resp3,
22}
23
24impl Default for ProtocolVersion {
25 fn default() -> Self {
26 Self::Resp2
27 }
28}
29
30impl std::fmt::Display for ProtocolVersion {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 match self {
33 Self::Resp2 => write!(f, "RESP2"),
34 Self::Resp3 => write!(f, "RESP3"),
35 }
36 }
37}
38
39#[derive(Debug, Clone)]
41pub struct ProtocolNegotiation {
42 pub version: ProtocolVersion,
44 pub capabilities: Vec<String>,
46}
47
48impl ProtocolNegotiation {
49 #[must_use]
51 pub const fn new(version: ProtocolVersion) -> Self {
52 Self {
53 version,
54 capabilities: Vec::new(),
55 }
56 }
57
58 #[must_use]
60 pub fn resp3_with_capabilities(capabilities: Vec<String>) -> Self {
61 Self {
62 version: ProtocolVersion::Resp3,
63 capabilities,
64 }
65 }
66
67 #[must_use]
69 pub fn has_capability(&self, capability: &str) -> bool {
70 self.capabilities.iter().any(|c| c == capability)
71 }
72}
73
74pub struct ProtocolNegotiator {
76 preferred_version: ProtocolVersion,
77}
78
79impl ProtocolNegotiator {
80 #[must_use]
82 pub const fn new(preferred_version: ProtocolVersion) -> Self {
83 Self { preferred_version }
84 }
85
86 pub async fn negotiate<T>(
96 &self,
97 connection: &mut T,
98 ) -> crate::core::error::RedisResult<ProtocolNegotiation>
99 where
100 T: ProtocolConnection,
101 {
102 match self.preferred_version {
103 ProtocolVersion::Resp2 => Ok(ProtocolNegotiation::new(ProtocolVersion::Resp2)),
104 ProtocolVersion::Resp3 => {
105 match self.try_negotiate_resp3(connection).await {
107 Ok(negotiation) => Ok(negotiation),
108 Err(_) => {
109 Ok(ProtocolNegotiation::new(ProtocolVersion::Resp2))
111 }
112 }
113 }
114 }
115 }
116
117 async fn try_negotiate_resp3<T>(
118 &self,
119 connection: &mut T,
120 ) -> crate::core::error::RedisResult<ProtocolNegotiation>
121 where
122 T: ProtocolConnection,
123 {
124 let hello_cmd = crate::core::value::RespValue::Array(vec![
126 crate::core::value::RespValue::BulkString(bytes::Bytes::from("HELLO")),
127 crate::core::value::RespValue::BulkString(bytes::Bytes::from("3")),
128 ]);
129
130 connection.send_command(&hello_cmd).await?;
131 let response = connection.read_response().await?;
132
133 match response {
135 crate::core::value::RespValue::Array(items) => {
136 let mut capabilities = Vec::new();
137
138 if items.len() >= 4 {
140 for item in items.iter().skip(3) {
142 if let crate::core::value::RespValue::BulkString(cap) = item {
143 if let Ok(cap_str) = String::from_utf8(cap.to_vec()) {
144 capabilities.push(cap_str);
145 }
146 }
147 }
148 }
149
150 Ok(ProtocolNegotiation::resp3_with_capabilities(capabilities))
151 }
152 _ => Err(crate::core::error::RedisError::Protocol(
153 "Invalid HELLO response".to_string(),
154 )),
155 }
156 }
157}
158
159impl Default for ProtocolNegotiator {
160 fn default() -> Self {
161 Self::new(ProtocolVersion::Resp2)
162 }
163}
164
165#[async_trait::async_trait]
167pub trait ProtocolConnection {
168 async fn send_command(
170 &mut self,
171 command: &crate::core::value::RespValue,
172 ) -> crate::core::error::RedisResult<()>;
173
174 async fn read_response(
176 &mut self,
177 ) -> crate::core::error::RedisResult<crate::core::value::RespValue>;
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183
184 #[test]
185 fn test_protocol_version_display() {
186 assert_eq!(ProtocolVersion::Resp2.to_string(), "RESP2");
187 assert_eq!(ProtocolVersion::Resp3.to_string(), "RESP3");
188 }
189
190 #[test]
191 fn test_protocol_negotiation() {
192 let negotiation = ProtocolNegotiation::new(ProtocolVersion::Resp2);
193 assert_eq!(negotiation.version, ProtocolVersion::Resp2);
194 assert!(negotiation.capabilities.is_empty());
195
196 let negotiation = ProtocolNegotiation::resp3_with_capabilities(vec![
197 "push".to_string(),
198 "streams".to_string(),
199 ]);
200 assert_eq!(negotiation.version, ProtocolVersion::Resp3);
201 assert!(negotiation.has_capability("push"));
202 assert!(negotiation.has_capability("streams"));
203 assert!(!negotiation.has_capability("unknown"));
204 }
205
206 #[test]
207 fn test_protocol_negotiator() {
208 let negotiator = ProtocolNegotiator::new(ProtocolVersion::Resp3);
209 assert_eq!(negotiator.preferred_version, ProtocolVersion::Resp3);
210
211 let negotiator = ProtocolNegotiator::default();
212 assert_eq!(negotiator.preferred_version, ProtocolVersion::Resp2);
213 }
214}