Skip to main content

ember_cluster/
slots.rs

1//! Hash slot management for Redis Cluster-compatible key distribution.
2//!
3//! Implements CRC16 hashing (XMODEM polynomial) and 16384-slot mapping
4//! following the Redis Cluster specification.
5
6use crate::NodeId;
7
8/// Total number of hash slots in the cluster (Redis Cluster standard).
9pub const SLOT_COUNT: u16 = 16384;
10
11/// CRC16 lookup table from Redis source code (crc16.c).
12/// Uses CCITT polynomial for Redis Cluster slot calculation.
13#[rustfmt::skip]
14static CRC16_TABLE: [u16; 256] = [
15    0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
16    0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
17    0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
18    0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
19    0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
20    0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
21    0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
22    0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
23    0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
24    0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
25    0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
26    0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
27    0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
28    0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
29    0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
30    0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
31    0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
32    0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
33    0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
34    0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
35    0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
36    0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
37    0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
38    0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
39    0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
40    0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
41    0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
42    0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
43    0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
44    0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
45    0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
46    0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
47];
48
49/// Computes CRC16 checksum using XMODEM polynomial.
50///
51/// This is the same algorithm Redis uses for slot calculation.
52fn crc16(data: &[u8]) -> u16 {
53    let mut crc: u16 = 0;
54    for &byte in data {
55        let idx = ((crc >> 8) ^ (byte as u16)) as usize;
56        crc = (crc << 8) ^ CRC16_TABLE[idx];
57    }
58    crc
59}
60
61/// Extracts the hashable portion of a key, handling hash tags.
62///
63/// Hash tags allow multiple keys to be assigned to the same slot.
64/// The tag is the content between the first `{` and the first `}` after it.
65///
66/// Examples:
67/// - `user:{123}:profile` → hashes `123`
68/// - `{user}:123` → hashes `user`
69/// - `foo{}{bar}` → hashes `foo{}{bar}` (empty tag, no match)
70/// - `foo{bar` → hashes `foo{bar` (no closing brace)
71/// - `foobar` → hashes `foobar` (no tag)
72fn extract_hash_tag(key: &[u8]) -> &[u8] {
73    // Find first '{'
74    let Some(open) = key.iter().position(|&b| b == b'{') else {
75        return key;
76    };
77
78    // Find first '}' after the '{'
79    let after_open = &key[open + 1..];
80    let Some(close) = after_open.iter().position(|&b| b == b'}') else {
81        return key;
82    };
83
84    // Empty tag (e.g., "foo{}bar") means use the whole key
85    if close == 0 {
86        return key;
87    }
88
89    &after_open[..close]
90}
91
92/// Computes the hash slot for a key.
93///
94/// Returns a value in the range [0, 16383].
95pub fn key_slot(key: &[u8]) -> u16 {
96    let hash_input = extract_hash_tag(key);
97    crc16(hash_input) % SLOT_COUNT
98}
99
100/// A contiguous range of slots assigned to a node.
101///
102/// # Invariants
103///
104/// A valid `SlotRange` always satisfies `start <= end`, meaning it contains
105/// at least one slot. This is enforced by debug assertions in the constructor.
106#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
107pub struct SlotRange {
108    pub start: u16,
109    pub end: u16, // inclusive
110}
111
112impl SlotRange {
113    /// Creates a new slot range (end is inclusive).
114    ///
115    /// # Panics
116    ///
117    /// Panics if `start > end` or if `end >= SLOT_COUNT`.
118    pub fn new(start: u16, end: u16) -> Self {
119        assert!(start <= end, "SlotRange requires start <= end");
120        assert!(end < SLOT_COUNT, "slot must be < {SLOT_COUNT}");
121        Self { start, end }
122    }
123
124    /// Creates a new slot range with runtime validation.
125    ///
126    /// Returns an error if `start > end` or `end >= SLOT_COUNT`.
127    /// Use this for untrusted input (e.g. network-decoded data).
128    pub fn try_new(start: u16, end: u16) -> Result<Self, std::io::Error> {
129        if start > end {
130            return Err(std::io::Error::new(
131                std::io::ErrorKind::InvalidData,
132                format!("SlotRange requires start <= end, got {start}..{end}"),
133            ));
134        }
135        if end >= SLOT_COUNT {
136            return Err(std::io::Error::new(
137                std::io::ErrorKind::InvalidData,
138                format!("slot {end} out of range (max {})", SLOT_COUNT - 1),
139            ));
140        }
141        Ok(Self { start, end })
142    }
143
144    /// Creates a range containing a single slot.
145    pub fn single(slot: u16) -> Self {
146        Self::new(slot, slot)
147    }
148
149    /// Returns the number of slots in this range (always >= 1 for valid ranges).
150    #[allow(clippy::len_without_is_empty)]
151    pub fn len(&self) -> u16 {
152        self.end - self.start + 1
153    }
154
155    /// Returns true if this range contains the given slot.
156    pub fn contains(&self, slot: u16) -> bool {
157        slot >= self.start && slot <= self.end
158    }
159
160    /// Returns an iterator over all slots in this range.
161    pub fn iter(&self) -> impl Iterator<Item = u16> {
162        self.start..=self.end
163    }
164}
165
166impl std::fmt::Display for SlotRange {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        if self.start == self.end {
169            write!(f, "{}", self.start)
170        } else {
171            write!(f, "{}-{}", self.start, self.end)
172        }
173    }
174}
175
176/// Maps each of the 16384 slots to a node ID.
177///
178/// When a slot is `None`, it means the slot is not assigned to any node,
179/// which indicates an incomplete cluster configuration.
180#[derive(Debug, Clone)]
181pub struct SlotMap {
182    slots: Box<[Option<NodeId>; SLOT_COUNT as usize]>,
183}
184
185impl Default for SlotMap {
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191impl SlotMap {
192    /// Creates an empty slot map with no assignments.
193    pub fn new() -> Self {
194        Self {
195            // Box it to avoid 128KB stack allocation
196            slots: Box::new([None; SLOT_COUNT as usize]),
197        }
198    }
199
200    /// Creates a slot map with all slots assigned to a single node.
201    ///
202    /// Useful for single-node clusters.
203    pub fn single_node(node: NodeId) -> Self {
204        let mut map = Self::new();
205        for slot in map.slots.iter_mut() {
206            *slot = Some(node);
207        }
208        map
209    }
210
211    /// Returns the node that owns the given slot, if assigned.
212    pub fn owner(&self, slot: u16) -> Option<NodeId> {
213        self.slots.get(slot as usize).copied().flatten()
214    }
215
216    /// Assigns a slot to a node.
217    pub fn assign(&mut self, slot: u16, node: NodeId) {
218        if let Some(entry) = self.slots.get_mut(slot as usize) {
219            *entry = Some(node);
220        }
221    }
222
223    /// Assigns a range of slots to a node.
224    pub fn assign_range(&mut self, range: SlotRange, node: NodeId) {
225        for slot in range.iter() {
226            self.assign(slot, node);
227        }
228    }
229
230    /// Clears the assignment for a slot.
231    pub fn unassign(&mut self, slot: u16) {
232        if let Some(entry) = self.slots.get_mut(slot as usize) {
233            *entry = None;
234        }
235    }
236
237    /// Returns true if all slots are assigned to some node.
238    pub fn is_complete(&self) -> bool {
239        self.slots.iter().all(|s| s.is_some())
240    }
241
242    /// Returns the number of unassigned slots.
243    pub fn unassigned_count(&self) -> usize {
244        self.slots.iter().filter(|s| s.is_none()).count()
245    }
246
247    /// Returns all slots owned by a specific node as a list of ranges.
248    ///
249    /// Consecutive slots are merged into ranges for compact representation.
250    pub fn slots_for_node(&self, node: NodeId) -> Vec<SlotRange> {
251        let mut ranges = Vec::new();
252        let mut range_start: Option<u16> = None;
253        let mut prev_slot: Option<u16> = None;
254
255        for (slot_idx, owner) in self.slots.iter().enumerate() {
256            let slot = slot_idx as u16;
257            let owned = *owner == Some(node);
258
259            match (owned, range_start) {
260                (true, None) => {
261                    // Start a new range
262                    range_start = Some(slot);
263                    prev_slot = Some(slot);
264                }
265                (true, Some(_)) => {
266                    // Continue the current range
267                    prev_slot = Some(slot);
268                }
269                (false, Some(start)) => {
270                    // End the current range
271                    if let Some(end) = prev_slot {
272                        ranges.push(SlotRange::new(start, end));
273                    }
274                    range_start = None;
275                    prev_slot = None;
276                }
277                (false, None) => {
278                    // Not in a range, not owned by this node
279                }
280            }
281        }
282
283        // Close any open range at the end
284        if let (Some(start), Some(end)) = (range_start, prev_slot) {
285            ranges.push(SlotRange::new(start, end));
286        }
287
288        ranges
289    }
290
291    /// Returns a count of slots per node.
292    pub fn slot_counts(&self) -> std::collections::HashMap<NodeId, usize> {
293        let mut counts = std::collections::HashMap::new();
294        for owner in self.slots.iter().flatten() {
295            *counts.entry(*owner).or_insert(0) += 1;
296        }
297        counts
298    }
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304    use uuid::Uuid;
305
306    // Test vectors verified against Redis CLUSTER KEYSLOT command
307    #[test]
308    fn crc16_matches_redis() {
309        // These are known Redis slot assignments
310        assert_eq!(key_slot(b""), 0);
311        assert_eq!(key_slot(b"foo"), 12182);
312        assert_eq!(key_slot(b"bar"), 5061);
313        assert_eq!(key_slot(b"hello"), 866);
314        // CRC16 CCITT/XMODEM of "123456789" is 0x31C3 = 12739
315        assert_eq!(key_slot(b"123456789"), 12739);
316    }
317
318    #[test]
319    fn hash_tag_extraction() {
320        // Basic hash tag
321        assert_eq!(key_slot(b"user:{123}:profile"), key_slot(b"123"));
322        assert_eq!(key_slot(b"order:{123}:items"), key_slot(b"123"));
323
324        // Tag at start
325        assert_eq!(key_slot(b"{user}:123"), key_slot(b"user"));
326
327        // Empty tag uses whole key
328        assert_eq!(key_slot(b"foo{}bar"), key_slot(b"foo{}bar"));
329
330        // No closing brace uses whole key
331        assert_eq!(key_slot(b"foo{bar"), key_slot(b"foo{bar"));
332
333        // Only first tag matters
334        assert_eq!(key_slot(b"{a}{b}"), key_slot(b"a"));
335    }
336
337    #[test]
338    fn slot_range_basics() {
339        let range = SlotRange::new(0, 5460);
340        assert_eq!(range.len(), 5461);
341        assert!(range.contains(0));
342        assert!(range.contains(5460));
343        assert!(!range.contains(5461));
344
345        let single = SlotRange::single(100);
346        assert_eq!(single.len(), 1);
347        assert!(single.contains(100));
348        assert!(!single.contains(99));
349        assert!(!single.contains(101));
350    }
351
352    #[test]
353    fn slot_range_display() {
354        assert_eq!(SlotRange::new(0, 5460).to_string(), "0-5460");
355        assert_eq!(SlotRange::single(100).to_string(), "100");
356    }
357
358    #[test]
359    fn slot_map_single_node() {
360        let node = NodeId(Uuid::new_v4());
361        let map = SlotMap::single_node(node);
362
363        assert!(map.is_complete());
364        assert_eq!(map.unassigned_count(), 0);
365        assert_eq!(map.owner(0), Some(node));
366        assert_eq!(map.owner(SLOT_COUNT - 1), Some(node));
367
368        let ranges = map.slots_for_node(node);
369        assert_eq!(ranges.len(), 1);
370        assert_eq!(ranges[0], SlotRange::new(0, SLOT_COUNT - 1));
371    }
372
373    #[test]
374    fn slot_map_multi_node() {
375        let node1 = NodeId(Uuid::new_v4());
376        let node2 = NodeId(Uuid::new_v4());
377        let node3 = NodeId(Uuid::new_v4());
378
379        let mut map = SlotMap::new();
380        assert!(!map.is_complete());
381        assert_eq!(map.unassigned_count(), SLOT_COUNT as usize);
382
383        // Assign slots evenly: 0-5460 to node1, 5461-10922 to node2, 10923-16383 to node3
384        map.assign_range(SlotRange::new(0, 5460), node1);
385        map.assign_range(SlotRange::new(5461, 10922), node2);
386        map.assign_range(SlotRange::new(10923, 16383), node3);
387
388        assert!(map.is_complete());
389
390        assert_eq!(map.owner(0), Some(node1));
391        assert_eq!(map.owner(5460), Some(node1));
392        assert_eq!(map.owner(5461), Some(node2));
393        assert_eq!(map.owner(10922), Some(node2));
394        assert_eq!(map.owner(10923), Some(node3));
395        assert_eq!(map.owner(16383), Some(node3));
396
397        let counts = map.slot_counts();
398        assert_eq!(counts.get(&node1), Some(&5461));
399        assert_eq!(counts.get(&node2), Some(&5462));
400        assert_eq!(counts.get(&node3), Some(&5461));
401    }
402
403    #[test]
404    fn slot_map_unassign() {
405        let node = NodeId(Uuid::new_v4());
406        let mut map = SlotMap::single_node(node);
407
408        map.unassign(100);
409        assert_eq!(map.owner(100), None);
410        assert!(!map.is_complete());
411        assert_eq!(map.unassigned_count(), 1);
412    }
413
414    #[test]
415    fn slot_range_try_new_validates() {
416        assert!(SlotRange::try_new(0, 5460).is_ok());
417        assert!(SlotRange::try_new(100, 100).is_ok());
418        // start > end
419        assert!(SlotRange::try_new(5000, 100).is_err());
420        // end >= SLOT_COUNT
421        assert!(SlotRange::try_new(0, 16384).is_err());
422        assert!(SlotRange::try_new(0, u16::MAX).is_err());
423    }
424
425    #[test]
426    fn slots_for_node_ranges() {
427        let node = NodeId(Uuid::new_v4());
428        let mut map = SlotMap::new();
429
430        // Assign non-contiguous ranges
431        map.assign_range(SlotRange::new(0, 10), node);
432        map.assign_range(SlotRange::new(100, 110), node);
433        map.assign_range(SlotRange::new(200, 200), node); // single slot
434
435        let ranges = map.slots_for_node(node);
436        assert_eq!(ranges.len(), 3);
437        assert_eq!(ranges[0], SlotRange::new(0, 10));
438        assert_eq!(ranges[1], SlotRange::new(100, 110));
439        assert_eq!(ranges[2], SlotRange::new(200, 200));
440    }
441}