Skip to main content

resp_proto/
cluster.rs

1//! Redis Cluster protocol building blocks.
2//!
3//! Stateless utilities for Redis Cluster: CRC16 hash slot calculation,
4//! MOVED/ASK redirect parsing, and CLUSTER SLOTS response decoding.
5
6use crate::Value;
7
8// ============================================================================
9// CRC16-XMODEM
10// ============================================================================
11
12/// CRC16-XMODEM lookup table (same polynomial as Redis `src/crc16.c`).
13#[rustfmt::skip]
14static CRC16_TABLE: [u16; 256] = [
15    0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
16    0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
17    0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
18    0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
19    0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
20    0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
21    0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
22    0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
23    0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
24    0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
25    0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
26    0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
27    0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
28    0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
29    0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
30    0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
31    0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
32    0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
33    0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
34    0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
35    0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
36    0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
37    0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
38    0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
39    0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
40    0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
41    0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
42    0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
43    0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
44    0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
45    0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
46    0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0,
47];
48
49/// Compute CRC16-XMODEM checksum (same algorithm as Redis).
50#[inline]
51pub fn crc16(data: &[u8]) -> u16 {
52    let mut crc: u16 = 0;
53    for &byte in data {
54        let index = ((crc >> 8) ^ byte as u16) as u8;
55        crc = (crc << 8) ^ CRC16_TABLE[index as usize];
56    }
57    crc
58}
59
60// ============================================================================
61// Hash Slot
62// ============================================================================
63
64/// Total number of hash slots in a Redis Cluster.
65pub const SLOT_COUNT: u16 = 16384;
66
67/// Compute the hash slot for a key.
68///
69/// If the key contains a hash tag (first `{` to next `}` with non-empty content),
70/// only the content inside the braces is hashed.
71#[inline]
72pub fn hash_slot(key: &[u8]) -> u16 {
73    let data = match memchr::memchr(b'{', key) {
74        Some(start) => {
75            let rest = &key[start + 1..];
76            match memchr::memchr(b'}', rest) {
77                Some(0) => key,            // empty tag like {}
78                Some(end) => &rest[..end], // use tag content
79                None => key,               // no closing brace
80            }
81        }
82        None => key,
83    };
84    crc16(data) % SLOT_COUNT
85}
86
87// ============================================================================
88// Redirect Parsing
89// ============================================================================
90
91/// The kind of cluster redirect.
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum RedirectKind {
94    /// MOVED — slot permanently owned by another node.
95    Moved,
96    /// ASK — one-time redirect during slot migration.
97    Ask,
98}
99
100/// A parsed MOVED or ASK redirect.
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct Redirect {
103    pub kind: RedirectKind,
104    pub slot: u16,
105    /// Address as `host:port` string. Kept as String because some deployments use hostnames.
106    pub address: String,
107}
108
109/// Parse a MOVED or ASK redirect from a RESP error value.
110///
111/// Returns `None` if the value is not an error or not a redirect.
112pub fn parse_redirect(value: &Value) -> Option<Redirect> {
113    let msg = match value {
114        Value::Error(e) => e,
115        _ => return None,
116    };
117
118    let s = std::str::from_utf8(msg).ok()?;
119    let (kind, rest) = if let Some(rest) = s.strip_prefix("MOVED ") {
120        (RedirectKind::Moved, rest)
121    } else if let Some(rest) = s.strip_prefix("ASK ") {
122        (RedirectKind::Ask, rest)
123    } else {
124        return None;
125    };
126
127    let mut parts = rest.splitn(2, ' ');
128    let slot: u16 = parts.next()?.parse().ok()?;
129    let address = parts.next()?;
130    if address.is_empty() {
131        return None;
132    }
133
134    Some(Redirect {
135        kind,
136        slot,
137        address: address.to_string(),
138    })
139}
140
141// ============================================================================
142// CLUSTER SLOTS Response Parsing
143// ============================================================================
144
145/// Information about a single cluster node.
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct NodeInfo {
148    /// Address as `host:port`.
149    pub address: String,
150    /// Optional node ID (40-char hex string).
151    pub node_id: Option<String>,
152}
153
154/// A contiguous range of hash slots owned by a primary (with optional replicas).
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct SlotRange {
157    /// Inclusive start slot.
158    pub start: u16,
159    /// Inclusive end slot.
160    pub end: u16,
161    /// Primary node for this range.
162    pub primary: NodeInfo,
163    /// Replica nodes.
164    pub replicas: Vec<NodeInfo>,
165}
166
167/// A parsed slot map from `CLUSTER SLOTS`.
168///
169/// Slot ranges are sorted by start slot for binary-search lookups.
170#[derive(Debug, Clone)]
171pub struct SlotMap {
172    ranges: Vec<SlotRange>,
173}
174
175impl SlotMap {
176    /// Parse a `CLUSTER SLOTS` response into a slot map.
177    ///
178    /// Returns `None` if the value is not a valid CLUSTER SLOTS response.
179    pub fn from_cluster_slots(value: &Value) -> Option<Self> {
180        let entries = match value {
181            Value::Array(arr) => arr,
182            _ => return None,
183        };
184
185        let mut ranges = Vec::with_capacity(entries.len());
186
187        for entry in entries {
188            let items = match entry {
189                Value::Array(arr) => arr,
190                _ => return None,
191            };
192
193            // Minimum: [start, end, primary_node]
194            if items.len() < 3 {
195                return None;
196            }
197
198            let start = int_value(&items[0])? as u16;
199            let end = int_value(&items[1])? as u16;
200            let primary = parse_node_info(&items[2])?;
201
202            let mut replicas = Vec::with_capacity(items.len().saturating_sub(3));
203            for item in &items[3..] {
204                replicas.push(parse_node_info(item)?);
205            }
206
207            ranges.push(SlotRange {
208                start,
209                end,
210                primary,
211                replicas,
212            });
213        }
214
215        ranges.sort_by_key(|r| r.start);
216
217        Some(SlotMap { ranges })
218    }
219
220    /// Look up the slot range that contains the given slot.
221    pub fn lookup(&self, slot: u16) -> Option<&SlotRange> {
222        let idx = self
223            .ranges
224            .partition_point(|r| r.start <= slot)
225            .checked_sub(1)?;
226        let range = &self.ranges[idx];
227        if slot <= range.end { Some(range) } else { None }
228    }
229
230    /// Returns the slot ranges.
231    pub fn ranges(&self) -> &[SlotRange] {
232        &self.ranges
233    }
234
235    /// Returns true if the slot map is empty.
236    pub fn is_empty(&self) -> bool {
237        self.ranges.is_empty()
238    }
239}
240
241/// Extract an integer from a RESP value (Integer or BulkString).
242fn int_value(value: &Value) -> Option<i64> {
243    match value {
244        Value::Integer(n) => Some(*n),
245        Value::BulkString(s) => std::str::from_utf8(s).ok()?.parse().ok(),
246        _ => None,
247    }
248}
249
250/// Parse a node info array: `[ip, port]` or `[ip, port, node_id]`.
251fn parse_node_info(value: &Value) -> Option<NodeInfo> {
252    let items = match value {
253        Value::Array(arr) => arr,
254        _ => return None,
255    };
256
257    if items.len() < 2 {
258        return None;
259    }
260
261    let ip = match &items[0] {
262        Value::BulkString(s) => std::str::from_utf8(s).ok()?,
263        _ => return None,
264    };
265    let port = int_value(&items[1])?;
266
267    let address = format!("{}:{}", ip, port);
268
269    let node_id = if items.len() >= 3 {
270        match &items[2] {
271            Value::BulkString(s) => {
272                let id = std::str::from_utf8(s).ok()?;
273                if id.is_empty() {
274                    None
275                } else {
276                    Some(id.to_string())
277                }
278            }
279            _ => None,
280        }
281    } else {
282        None
283    };
284
285    Some(NodeInfo { address, node_id })
286}
287
288#[cfg(test)]
289mod tests {
290    use bytes::Bytes;
291
292    use super::*;
293
294    // ====================================================================
295    // CRC16
296    // ====================================================================
297
298    #[test]
299    fn test_crc16_empty() {
300        assert_eq!(crc16(b""), 0);
301    }
302
303    #[test]
304    fn test_crc16_known_vector() {
305        // Standard CRC16-XMODEM test vector
306        assert_eq!(crc16(b"123456789"), 0x31C3);
307    }
308
309    // ====================================================================
310    // Hash Slot
311    // ====================================================================
312
313    #[test]
314    fn test_hash_slot_range() {
315        let slot = hash_slot(b"somekey");
316        assert!(slot < SLOT_COUNT);
317    }
318
319    #[test]
320    fn test_hash_slot_deterministic() {
321        assert_eq!(hash_slot(b"foo"), hash_slot(b"foo"));
322    }
323
324    #[test]
325    fn test_hash_slot_tag() {
326        // {user}.name and {user}.email should hash to the same slot
327        assert_eq!(hash_slot(b"{user}.name"), hash_slot(b"{user}.email"));
328    }
329
330    #[test]
331    fn test_hash_slot_empty_tag() {
332        // {} is an empty tag — the whole key is hashed
333        let slot_full = crc16(b"{}key") % SLOT_COUNT;
334        assert_eq!(hash_slot(b"{}key"), slot_full);
335    }
336
337    #[test]
338    fn test_hash_slot_nested_braces() {
339        // {{user}} — first { to first } gives "{user"
340        assert_eq!(hash_slot(b"{{user}}"), crc16(b"{user") % SLOT_COUNT);
341    }
342
343    #[test]
344    fn test_hash_slot_no_closing_brace() {
345        // {user without } — whole key is hashed
346        let slot = crc16(b"{user") % SLOT_COUNT;
347        assert_eq!(hash_slot(b"{user"), slot);
348    }
349
350    // ====================================================================
351    // Redirect Parsing
352    // ====================================================================
353
354    #[test]
355    fn test_parse_redirect_moved() {
356        let value = Value::Error(Bytes::from_static(b"MOVED 3999 127.0.0.1:6380"));
357        let r = parse_redirect(&value).unwrap();
358        assert_eq!(r.kind, RedirectKind::Moved);
359        assert_eq!(r.slot, 3999);
360        assert_eq!(r.address, "127.0.0.1:6380");
361    }
362
363    #[test]
364    fn test_parse_redirect_ask() {
365        let value = Value::Error(Bytes::from_static(b"ASK 100 10.0.0.1:7000"));
366        let r = parse_redirect(&value).unwrap();
367        assert_eq!(r.kind, RedirectKind::Ask);
368        assert_eq!(r.slot, 100);
369        assert_eq!(r.address, "10.0.0.1:7000");
370    }
371
372    #[test]
373    fn test_parse_redirect_non_error() {
374        let value = Value::SimpleString(Bytes::from_static(b"OK"));
375        assert!(parse_redirect(&value).is_none());
376    }
377
378    #[test]
379    fn test_parse_redirect_non_redirect_error() {
380        let value = Value::Error(Bytes::from_static(b"ERR unknown command"));
381        assert!(parse_redirect(&value).is_none());
382    }
383
384    #[test]
385    fn test_parse_redirect_malformed_missing_address() {
386        let value = Value::Error(Bytes::from_static(b"MOVED 3999"));
387        assert!(parse_redirect(&value).is_none());
388    }
389
390    #[test]
391    fn test_parse_redirect_malformed_bad_slot() {
392        let value = Value::Error(Bytes::from_static(b"MOVED abc 127.0.0.1:6380"));
393        assert!(parse_redirect(&value).is_none());
394    }
395
396    // ====================================================================
397    // SlotMap
398    // ====================================================================
399
400    fn make_node(ip: &str, port: i64, node_id: Option<&str>) -> Value {
401        let mut arr = vec![
402            Value::BulkString(Bytes::copy_from_slice(ip.as_bytes())),
403            Value::Integer(port),
404        ];
405        if let Some(id) = node_id {
406            arr.push(Value::BulkString(Bytes::copy_from_slice(id.as_bytes())));
407        }
408        Value::Array(arr)
409    }
410
411    #[test]
412    fn test_slot_map_three_nodes() {
413        let resp = Value::Array(vec![
414            Value::Array(vec![
415                Value::Integer(0),
416                Value::Integer(5460),
417                make_node("10.0.0.1", 7000, Some("node1")),
418            ]),
419            Value::Array(vec![
420                Value::Integer(5461),
421                Value::Integer(10922),
422                make_node("10.0.0.2", 7000, Some("node2")),
423            ]),
424            Value::Array(vec![
425                Value::Integer(10923),
426                Value::Integer(16383),
427                make_node("10.0.0.3", 7000, Some("node3")),
428            ]),
429        ]);
430
431        let map = SlotMap::from_cluster_slots(&resp).unwrap();
432        assert_eq!(map.ranges().len(), 3);
433        assert!(!map.is_empty());
434
435        // Boundary lookups
436        let r = map.lookup(0).unwrap();
437        assert_eq!(r.primary.address, "10.0.0.1:7000");
438
439        let r = map.lookup(5460).unwrap();
440        assert_eq!(r.primary.address, "10.0.0.1:7000");
441
442        let r = map.lookup(5461).unwrap();
443        assert_eq!(r.primary.address, "10.0.0.2:7000");
444
445        let r = map.lookup(16383).unwrap();
446        assert_eq!(r.primary.address, "10.0.0.3:7000");
447    }
448
449    #[test]
450    fn test_slot_map_with_replicas() {
451        let resp = Value::Array(vec![Value::Array(vec![
452            Value::Integer(0),
453            Value::Integer(16383),
454            make_node("10.0.0.1", 7000, Some("primary1")),
455            make_node("10.0.0.2", 7001, Some("replica1")),
456            make_node("10.0.0.3", 7002, None),
457        ])]);
458
459        let map = SlotMap::from_cluster_slots(&resp).unwrap();
460        let r = map.lookup(0).unwrap();
461        assert_eq!(r.replicas.len(), 2);
462        assert_eq!(r.replicas[0].address, "10.0.0.2:7001");
463        assert_eq!(r.replicas[0].node_id.as_deref(), Some("replica1"));
464        assert_eq!(r.replicas[1].address, "10.0.0.3:7002");
465        assert!(r.replicas[1].node_id.is_none());
466    }
467
468    #[test]
469    fn test_slot_map_empty_response() {
470        let resp = Value::Array(vec![]);
471        let map = SlotMap::from_cluster_slots(&resp).unwrap();
472        assert!(map.is_empty());
473        assert!(map.lookup(0).is_none());
474    }
475
476    #[test]
477    fn test_slot_map_non_array() {
478        let resp = Value::SimpleString(Bytes::from_static(b"OK"));
479        assert!(SlotMap::from_cluster_slots(&resp).is_none());
480    }
481
482    #[test]
483    fn test_slot_map_lookup_gap() {
484        // Create a map with a gap (slots 100-200 and 300-400)
485        let resp = Value::Array(vec![
486            Value::Array(vec![
487                Value::Integer(100),
488                Value::Integer(200),
489                make_node("10.0.0.1", 7000, None),
490            ]),
491            Value::Array(vec![
492                Value::Integer(300),
493                Value::Integer(400),
494                make_node("10.0.0.2", 7000, None),
495            ]),
496        ]);
497
498        let map = SlotMap::from_cluster_slots(&resp).unwrap();
499        assert!(map.lookup(250).is_none()); // in the gap
500        assert!(map.lookup(50).is_none()); // before first range
501        assert!(map.lookup(500).is_none()); // after last range
502        assert!(map.lookup(150).is_some());
503        assert!(map.lookup(350).is_some());
504    }
505}