1use crate::Value;
7
8#[rustfmt::skip]
14static CRC16_TABLE: [u16; 256] = [
15 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
16 0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
17 0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
18 0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
19 0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
20 0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
21 0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
22 0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
23 0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
24 0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
25 0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
26 0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
27 0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
28 0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
29 0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
30 0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
31 0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
32 0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
33 0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
34 0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
35 0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
36 0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
37 0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
38 0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
39 0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
40 0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
41 0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
42 0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
43 0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
44 0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
45 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
46 0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0,
47];
48
49#[inline]
51pub fn crc16(data: &[u8]) -> u16 {
52 let mut crc: u16 = 0;
53 for &byte in data {
54 let index = ((crc >> 8) ^ byte as u16) as u8;
55 crc = (crc << 8) ^ CRC16_TABLE[index as usize];
56 }
57 crc
58}
59
60pub const SLOT_COUNT: u16 = 16384;
66
67#[inline]
72pub fn hash_slot(key: &[u8]) -> u16 {
73 let data = match memchr::memchr(b'{', key) {
74 Some(start) => {
75 let rest = &key[start + 1..];
76 match memchr::memchr(b'}', rest) {
77 Some(0) => key, Some(end) => &rest[..end], None => key, }
81 }
82 None => key,
83 };
84 crc16(data) % SLOT_COUNT
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum RedirectKind {
94 Moved,
96 Ask,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct Redirect {
103 pub kind: RedirectKind,
104 pub slot: u16,
105 pub address: String,
107}
108
109pub fn parse_redirect(value: &Value) -> Option<Redirect> {
113 let msg = match value {
114 Value::Error(e) => e,
115 _ => return None,
116 };
117
118 let s = std::str::from_utf8(msg).ok()?;
119 let (kind, rest) = if let Some(rest) = s.strip_prefix("MOVED ") {
120 (RedirectKind::Moved, rest)
121 } else if let Some(rest) = s.strip_prefix("ASK ") {
122 (RedirectKind::Ask, rest)
123 } else {
124 return None;
125 };
126
127 let mut parts = rest.splitn(2, ' ');
128 let slot: u16 = parts.next()?.parse().ok()?;
129 let address = parts.next()?;
130 if address.is_empty() {
131 return None;
132 }
133
134 Some(Redirect {
135 kind,
136 slot,
137 address: address.to_string(),
138 })
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct NodeInfo {
148 pub address: String,
150 pub node_id: Option<String>,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct SlotRange {
157 pub start: u16,
159 pub end: u16,
161 pub primary: NodeInfo,
163 pub replicas: Vec<NodeInfo>,
165}
166
167#[derive(Debug, Clone)]
171pub struct SlotMap {
172 ranges: Vec<SlotRange>,
173}
174
175impl SlotMap {
176 pub fn from_cluster_slots(value: &Value) -> Option<Self> {
180 let entries = match value {
181 Value::Array(arr) => arr,
182 _ => return None,
183 };
184
185 let mut ranges = Vec::with_capacity(entries.len());
186
187 for entry in entries {
188 let items = match entry {
189 Value::Array(arr) => arr,
190 _ => return None,
191 };
192
193 if items.len() < 3 {
195 return None;
196 }
197
198 let start = int_value(&items[0])? as u16;
199 let end = int_value(&items[1])? as u16;
200 let primary = parse_node_info(&items[2])?;
201
202 let mut replicas = Vec::with_capacity(items.len().saturating_sub(3));
203 for item in &items[3..] {
204 replicas.push(parse_node_info(item)?);
205 }
206
207 ranges.push(SlotRange {
208 start,
209 end,
210 primary,
211 replicas,
212 });
213 }
214
215 ranges.sort_by_key(|r| r.start);
216
217 Some(SlotMap { ranges })
218 }
219
220 pub fn lookup(&self, slot: u16) -> Option<&SlotRange> {
222 let idx = self
223 .ranges
224 .partition_point(|r| r.start <= slot)
225 .checked_sub(1)?;
226 let range = &self.ranges[idx];
227 if slot <= range.end { Some(range) } else { None }
228 }
229
230 pub fn ranges(&self) -> &[SlotRange] {
232 &self.ranges
233 }
234
235 pub fn is_empty(&self) -> bool {
237 self.ranges.is_empty()
238 }
239}
240
241fn int_value(value: &Value) -> Option<i64> {
243 match value {
244 Value::Integer(n) => Some(*n),
245 Value::BulkString(s) => std::str::from_utf8(s).ok()?.parse().ok(),
246 _ => None,
247 }
248}
249
250fn parse_node_info(value: &Value) -> Option<NodeInfo> {
252 let items = match value {
253 Value::Array(arr) => arr,
254 _ => return None,
255 };
256
257 if items.len() < 2 {
258 return None;
259 }
260
261 let ip = match &items[0] {
262 Value::BulkString(s) => std::str::from_utf8(s).ok()?,
263 _ => return None,
264 };
265 let port = int_value(&items[1])?;
266
267 let address = format!("{}:{}", ip, port);
268
269 let node_id = if items.len() >= 3 {
270 match &items[2] {
271 Value::BulkString(s) => {
272 let id = std::str::from_utf8(s).ok()?;
273 if id.is_empty() {
274 None
275 } else {
276 Some(id.to_string())
277 }
278 }
279 _ => None,
280 }
281 } else {
282 None
283 };
284
285 Some(NodeInfo { address, node_id })
286}
287
288#[cfg(test)]
289mod tests {
290 use bytes::Bytes;
291
292 use super::*;
293
294 #[test]
299 fn test_crc16_empty() {
300 assert_eq!(crc16(b""), 0);
301 }
302
303 #[test]
304 fn test_crc16_known_vector() {
305 assert_eq!(crc16(b"123456789"), 0x31C3);
307 }
308
309 #[test]
314 fn test_hash_slot_range() {
315 let slot = hash_slot(b"somekey");
316 assert!(slot < SLOT_COUNT);
317 }
318
319 #[test]
320 fn test_hash_slot_deterministic() {
321 assert_eq!(hash_slot(b"foo"), hash_slot(b"foo"));
322 }
323
324 #[test]
325 fn test_hash_slot_tag() {
326 assert_eq!(hash_slot(b"{user}.name"), hash_slot(b"{user}.email"));
328 }
329
330 #[test]
331 fn test_hash_slot_empty_tag() {
332 let slot_full = crc16(b"{}key") % SLOT_COUNT;
334 assert_eq!(hash_slot(b"{}key"), slot_full);
335 }
336
337 #[test]
338 fn test_hash_slot_nested_braces() {
339 assert_eq!(hash_slot(b"{{user}}"), crc16(b"{user") % SLOT_COUNT);
341 }
342
343 #[test]
344 fn test_hash_slot_no_closing_brace() {
345 let slot = crc16(b"{user") % SLOT_COUNT;
347 assert_eq!(hash_slot(b"{user"), slot);
348 }
349
350 #[test]
355 fn test_parse_redirect_moved() {
356 let value = Value::Error(Bytes::from_static(b"MOVED 3999 127.0.0.1:6380"));
357 let r = parse_redirect(&value).unwrap();
358 assert_eq!(r.kind, RedirectKind::Moved);
359 assert_eq!(r.slot, 3999);
360 assert_eq!(r.address, "127.0.0.1:6380");
361 }
362
363 #[test]
364 fn test_parse_redirect_ask() {
365 let value = Value::Error(Bytes::from_static(b"ASK 100 10.0.0.1:7000"));
366 let r = parse_redirect(&value).unwrap();
367 assert_eq!(r.kind, RedirectKind::Ask);
368 assert_eq!(r.slot, 100);
369 assert_eq!(r.address, "10.0.0.1:7000");
370 }
371
372 #[test]
373 fn test_parse_redirect_non_error() {
374 let value = Value::SimpleString(Bytes::from_static(b"OK"));
375 assert!(parse_redirect(&value).is_none());
376 }
377
378 #[test]
379 fn test_parse_redirect_non_redirect_error() {
380 let value = Value::Error(Bytes::from_static(b"ERR unknown command"));
381 assert!(parse_redirect(&value).is_none());
382 }
383
384 #[test]
385 fn test_parse_redirect_malformed_missing_address() {
386 let value = Value::Error(Bytes::from_static(b"MOVED 3999"));
387 assert!(parse_redirect(&value).is_none());
388 }
389
390 #[test]
391 fn test_parse_redirect_malformed_bad_slot() {
392 let value = Value::Error(Bytes::from_static(b"MOVED abc 127.0.0.1:6380"));
393 assert!(parse_redirect(&value).is_none());
394 }
395
396 fn make_node(ip: &str, port: i64, node_id: Option<&str>) -> Value {
401 let mut arr = vec![
402 Value::BulkString(Bytes::copy_from_slice(ip.as_bytes())),
403 Value::Integer(port),
404 ];
405 if let Some(id) = node_id {
406 arr.push(Value::BulkString(Bytes::copy_from_slice(id.as_bytes())));
407 }
408 Value::Array(arr)
409 }
410
411 #[test]
412 fn test_slot_map_three_nodes() {
413 let resp = Value::Array(vec![
414 Value::Array(vec![
415 Value::Integer(0),
416 Value::Integer(5460),
417 make_node("10.0.0.1", 7000, Some("node1")),
418 ]),
419 Value::Array(vec![
420 Value::Integer(5461),
421 Value::Integer(10922),
422 make_node("10.0.0.2", 7000, Some("node2")),
423 ]),
424 Value::Array(vec![
425 Value::Integer(10923),
426 Value::Integer(16383),
427 make_node("10.0.0.3", 7000, Some("node3")),
428 ]),
429 ]);
430
431 let map = SlotMap::from_cluster_slots(&resp).unwrap();
432 assert_eq!(map.ranges().len(), 3);
433 assert!(!map.is_empty());
434
435 let r = map.lookup(0).unwrap();
437 assert_eq!(r.primary.address, "10.0.0.1:7000");
438
439 let r = map.lookup(5460).unwrap();
440 assert_eq!(r.primary.address, "10.0.0.1:7000");
441
442 let r = map.lookup(5461).unwrap();
443 assert_eq!(r.primary.address, "10.0.0.2:7000");
444
445 let r = map.lookup(16383).unwrap();
446 assert_eq!(r.primary.address, "10.0.0.3:7000");
447 }
448
449 #[test]
450 fn test_slot_map_with_replicas() {
451 let resp = Value::Array(vec![Value::Array(vec![
452 Value::Integer(0),
453 Value::Integer(16383),
454 make_node("10.0.0.1", 7000, Some("primary1")),
455 make_node("10.0.0.2", 7001, Some("replica1")),
456 make_node("10.0.0.3", 7002, None),
457 ])]);
458
459 let map = SlotMap::from_cluster_slots(&resp).unwrap();
460 let r = map.lookup(0).unwrap();
461 assert_eq!(r.replicas.len(), 2);
462 assert_eq!(r.replicas[0].address, "10.0.0.2:7001");
463 assert_eq!(r.replicas[0].node_id.as_deref(), Some("replica1"));
464 assert_eq!(r.replicas[1].address, "10.0.0.3:7002");
465 assert!(r.replicas[1].node_id.is_none());
466 }
467
468 #[test]
469 fn test_slot_map_empty_response() {
470 let resp = Value::Array(vec![]);
471 let map = SlotMap::from_cluster_slots(&resp).unwrap();
472 assert!(map.is_empty());
473 assert!(map.lookup(0).is_none());
474 }
475
476 #[test]
477 fn test_slot_map_non_array() {
478 let resp = Value::SimpleString(Bytes::from_static(b"OK"));
479 assert!(SlotMap::from_cluster_slots(&resp).is_none());
480 }
481
482 #[test]
483 fn test_slot_map_lookup_gap() {
484 let resp = Value::Array(vec![
486 Value::Array(vec![
487 Value::Integer(100),
488 Value::Integer(200),
489 make_node("10.0.0.1", 7000, None),
490 ]),
491 Value::Array(vec![
492 Value::Integer(300),
493 Value::Integer(400),
494 make_node("10.0.0.2", 7000, None),
495 ]),
496 ]);
497
498 let map = SlotMap::from_cluster_slots(&resp).unwrap();
499 assert!(map.lookup(250).is_none()); assert!(map.lookup(50).is_none()); assert!(map.lookup(500).is_none()); assert!(map.lookup(150).is_some());
503 assert!(map.lookup(350).is_some());
504 }
505}