Skip to main content

ember_cluster/
slots.rs

1//! Hash slot management for Redis Cluster-compatible key distribution.
2//!
3//! Implements CRC16 hashing (XMODEM polynomial) and 16384-slot mapping
4//! following the Redis Cluster specification.
5
6use crate::NodeId;
7
8/// Total number of hash slots in the cluster (Redis Cluster standard).
9pub const SLOT_COUNT: u16 = 16384;
10
11/// CRC16 lookup table from Redis source code (crc16.c).
12/// Uses CCITT polynomial for Redis Cluster slot calculation.
13#[rustfmt::skip]
14static CRC16_TABLE: [u16; 256] = [
15    0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
16    0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
17    0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
18    0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
19    0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
20    0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
21    0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
22    0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
23    0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
24    0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
25    0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
26    0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
27    0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
28    0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
29    0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
30    0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
31    0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
32    0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
33    0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
34    0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
35    0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
36    0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
37    0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
38    0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
39    0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
40    0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
41    0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
42    0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
43    0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
44    0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
45    0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
46    0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
47];
48
49/// Computes CRC16 checksum using XMODEM polynomial.
50///
51/// This is the same algorithm Redis uses for slot calculation.
52fn crc16(data: &[u8]) -> u16 {
53    let mut crc: u16 = 0;
54    for &byte in data {
55        let idx = ((crc >> 8) ^ (byte as u16)) as usize;
56        crc = (crc << 8) ^ CRC16_TABLE[idx];
57    }
58    crc
59}
60
61/// Extracts the hashable portion of a key, handling hash tags.
62///
63/// Hash tags allow multiple keys to be assigned to the same slot.
64/// The tag is the content between the first `{` and the first `}` after it.
65///
66/// Examples:
67/// - `user:{123}:profile` → hashes `123`
68/// - `{user}:123` → hashes `user`
69/// - `foo{}{bar}` → hashes `foo{}{bar}` (empty tag, no match)
70/// - `foo{bar` → hashes `foo{bar` (no closing brace)
71/// - `foobar` → hashes `foobar` (no tag)
72fn extract_hash_tag(key: &[u8]) -> &[u8] {
73    // Find first '{'
74    let Some(open) = key.iter().position(|&b| b == b'{') else {
75        return key;
76    };
77
78    // Find first '}' after the '{'
79    let after_open = &key[open + 1..];
80    let Some(close) = after_open.iter().position(|&b| b == b'}') else {
81        return key;
82    };
83
84    // Empty tag (e.g., "foo{}bar") means use the whole key
85    if close == 0 {
86        return key;
87    }
88
89    &after_open[..close]
90}
91
92/// Computes the hash slot for a key.
93///
94/// Returns a value in the range [0, 16383].
95pub fn key_slot(key: &[u8]) -> u16 {
96    let hash_input = extract_hash_tag(key);
97    crc16(hash_input) % SLOT_COUNT
98}
99
100/// A contiguous range of slots assigned to a node.
101///
102/// # Invariants
103///
104/// A valid `SlotRange` always satisfies `start <= end`, meaning it contains
105/// at least one slot. This is enforced by debug assertions in the constructor.
106#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
107pub struct SlotRange {
108    pub start: u16,
109    pub end: u16, // inclusive
110}
111
112impl SlotRange {
113    /// Creates a new slot range (end is inclusive).
114    ///
115    /// # Panics
116    ///
117    /// Debug-panics if `start > end` or if `end >= SLOT_COUNT`.
118    pub fn new(start: u16, end: u16) -> Self {
119        debug_assert!(start <= end, "SlotRange requires start <= end");
120        debug_assert!(end < SLOT_COUNT, "slot must be < {SLOT_COUNT}");
121        Self { start, end }
122    }
123
124    /// Creates a range containing a single slot.
125    pub fn single(slot: u16) -> Self {
126        Self::new(slot, slot)
127    }
128
129    /// Returns the number of slots in this range (always >= 1 for valid ranges).
130    #[allow(clippy::len_without_is_empty)]
131    pub fn len(&self) -> u16 {
132        self.end - self.start + 1
133    }
134
135    /// Returns true if this range contains the given slot.
136    pub fn contains(&self, slot: u16) -> bool {
137        slot >= self.start && slot <= self.end
138    }
139
140    /// Returns an iterator over all slots in this range.
141    pub fn iter(&self) -> impl Iterator<Item = u16> {
142        self.start..=self.end
143    }
144}
145
146impl std::fmt::Display for SlotRange {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        if self.start == self.end {
149            write!(f, "{}", self.start)
150        } else {
151            write!(f, "{}-{}", self.start, self.end)
152        }
153    }
154}
155
156/// Maps each of the 16384 slots to a node ID.
157///
158/// When a slot is `None`, it means the slot is not assigned to any node,
159/// which indicates an incomplete cluster configuration.
160#[derive(Debug, Clone)]
161pub struct SlotMap {
162    slots: Box<[Option<NodeId>; SLOT_COUNT as usize]>,
163}
164
165impl Default for SlotMap {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171impl SlotMap {
172    /// Creates an empty slot map with no assignments.
173    pub fn new() -> Self {
174        Self {
175            // Box it to avoid 128KB stack allocation
176            slots: Box::new([None; SLOT_COUNT as usize]),
177        }
178    }
179
180    /// Creates a slot map with all slots assigned to a single node.
181    ///
182    /// Useful for single-node clusters.
183    pub fn single_node(node: NodeId) -> Self {
184        let mut map = Self::new();
185        for slot in map.slots.iter_mut() {
186            *slot = Some(node);
187        }
188        map
189    }
190
191    /// Returns the node that owns the given slot, if assigned.
192    pub fn owner(&self, slot: u16) -> Option<NodeId> {
193        self.slots.get(slot as usize).copied().flatten()
194    }
195
196    /// Assigns a slot to a node.
197    pub fn assign(&mut self, slot: u16, node: NodeId) {
198        if let Some(entry) = self.slots.get_mut(slot as usize) {
199            *entry = Some(node);
200        }
201    }
202
203    /// Assigns a range of slots to a node.
204    pub fn assign_range(&mut self, range: SlotRange, node: NodeId) {
205        for slot in range.iter() {
206            self.assign(slot, node);
207        }
208    }
209
210    /// Clears the assignment for a slot.
211    pub fn unassign(&mut self, slot: u16) {
212        if let Some(entry) = self.slots.get_mut(slot as usize) {
213            *entry = None;
214        }
215    }
216
217    /// Returns true if all slots are assigned to some node.
218    pub fn is_complete(&self) -> bool {
219        self.slots.iter().all(|s| s.is_some())
220    }
221
222    /// Returns the number of unassigned slots.
223    pub fn unassigned_count(&self) -> usize {
224        self.slots.iter().filter(|s| s.is_none()).count()
225    }
226
227    /// Returns all slots owned by a specific node as a list of ranges.
228    ///
229    /// Consecutive slots are merged into ranges for compact representation.
230    pub fn slots_for_node(&self, node: NodeId) -> Vec<SlotRange> {
231        let mut ranges = Vec::new();
232        let mut range_start: Option<u16> = None;
233        let mut prev_slot: Option<u16> = None;
234
235        for (slot_idx, owner) in self.slots.iter().enumerate() {
236            let slot = slot_idx as u16;
237            let owned = *owner == Some(node);
238
239            match (owned, range_start) {
240                (true, None) => {
241                    // Start a new range
242                    range_start = Some(slot);
243                    prev_slot = Some(slot);
244                }
245                (true, Some(_)) => {
246                    // Continue the current range
247                    prev_slot = Some(slot);
248                }
249                (false, Some(start)) => {
250                    // End the current range
251                    if let Some(end) = prev_slot {
252                        ranges.push(SlotRange::new(start, end));
253                    }
254                    range_start = None;
255                    prev_slot = None;
256                }
257                (false, None) => {
258                    // Not in a range, not owned by this node
259                }
260            }
261        }
262
263        // Close any open range at the end
264        if let (Some(start), Some(end)) = (range_start, prev_slot) {
265            ranges.push(SlotRange::new(start, end));
266        }
267
268        ranges
269    }
270
271    /// Returns a count of slots per node.
272    pub fn slot_counts(&self) -> std::collections::HashMap<NodeId, usize> {
273        let mut counts = std::collections::HashMap::new();
274        for owner in self.slots.iter().flatten() {
275            *counts.entry(*owner).or_insert(0) += 1;
276        }
277        counts
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284    use uuid::Uuid;
285
286    // Test vectors verified against Redis CLUSTER KEYSLOT command
287    #[test]
288    fn crc16_matches_redis() {
289        // These are known Redis slot assignments
290        assert_eq!(key_slot(b""), 0);
291        assert_eq!(key_slot(b"foo"), 12182);
292        assert_eq!(key_slot(b"bar"), 5061);
293        assert_eq!(key_slot(b"hello"), 866);
294        // CRC16 CCITT/XMODEM of "123456789" is 0x31C3 = 12739
295        assert_eq!(key_slot(b"123456789"), 12739);
296    }
297
298    #[test]
299    fn hash_tag_extraction() {
300        // Basic hash tag
301        assert_eq!(key_slot(b"user:{123}:profile"), key_slot(b"123"));
302        assert_eq!(key_slot(b"order:{123}:items"), key_slot(b"123"));
303
304        // Tag at start
305        assert_eq!(key_slot(b"{user}:123"), key_slot(b"user"));
306
307        // Empty tag uses whole key
308        assert_eq!(key_slot(b"foo{}bar"), key_slot(b"foo{}bar"));
309
310        // No closing brace uses whole key
311        assert_eq!(key_slot(b"foo{bar"), key_slot(b"foo{bar"));
312
313        // Only first tag matters
314        assert_eq!(key_slot(b"{a}{b}"), key_slot(b"a"));
315    }
316
317    #[test]
318    fn slot_range_basics() {
319        let range = SlotRange::new(0, 5460);
320        assert_eq!(range.len(), 5461);
321        assert!(range.contains(0));
322        assert!(range.contains(5460));
323        assert!(!range.contains(5461));
324
325        let single = SlotRange::single(100);
326        assert_eq!(single.len(), 1);
327        assert!(single.contains(100));
328        assert!(!single.contains(99));
329        assert!(!single.contains(101));
330    }
331
332    #[test]
333    fn slot_range_display() {
334        assert_eq!(SlotRange::new(0, 5460).to_string(), "0-5460");
335        assert_eq!(SlotRange::single(100).to_string(), "100");
336    }
337
338    #[test]
339    fn slot_map_single_node() {
340        let node = NodeId(Uuid::new_v4());
341        let map = SlotMap::single_node(node);
342
343        assert!(map.is_complete());
344        assert_eq!(map.unassigned_count(), 0);
345        assert_eq!(map.owner(0), Some(node));
346        assert_eq!(map.owner(SLOT_COUNT - 1), Some(node));
347
348        let ranges = map.slots_for_node(node);
349        assert_eq!(ranges.len(), 1);
350        assert_eq!(ranges[0], SlotRange::new(0, SLOT_COUNT - 1));
351    }
352
353    #[test]
354    fn slot_map_multi_node() {
355        let node1 = NodeId(Uuid::new_v4());
356        let node2 = NodeId(Uuid::new_v4());
357        let node3 = NodeId(Uuid::new_v4());
358
359        let mut map = SlotMap::new();
360        assert!(!map.is_complete());
361        assert_eq!(map.unassigned_count(), SLOT_COUNT as usize);
362
363        // Assign slots evenly: 0-5460 to node1, 5461-10922 to node2, 10923-16383 to node3
364        map.assign_range(SlotRange::new(0, 5460), node1);
365        map.assign_range(SlotRange::new(5461, 10922), node2);
366        map.assign_range(SlotRange::new(10923, 16383), node3);
367
368        assert!(map.is_complete());
369
370        assert_eq!(map.owner(0), Some(node1));
371        assert_eq!(map.owner(5460), Some(node1));
372        assert_eq!(map.owner(5461), Some(node2));
373        assert_eq!(map.owner(10922), Some(node2));
374        assert_eq!(map.owner(10923), Some(node3));
375        assert_eq!(map.owner(16383), Some(node3));
376
377        let counts = map.slot_counts();
378        assert_eq!(counts.get(&node1), Some(&5461));
379        assert_eq!(counts.get(&node2), Some(&5462));
380        assert_eq!(counts.get(&node3), Some(&5461));
381    }
382
383    #[test]
384    fn slot_map_unassign() {
385        let node = NodeId(Uuid::new_v4());
386        let mut map = SlotMap::single_node(node);
387
388        map.unassign(100);
389        assert_eq!(map.owner(100), None);
390        assert!(!map.is_complete());
391        assert_eq!(map.unassigned_count(), 1);
392    }
393
394    #[test]
395    fn slots_for_node_ranges() {
396        let node = NodeId(Uuid::new_v4());
397        let mut map = SlotMap::new();
398
399        // Assign non-contiguous ranges
400        map.assign_range(SlotRange::new(0, 10), node);
401        map.assign_range(SlotRange::new(100, 110), node);
402        map.assign_range(SlotRange::new(200, 200), node); // single slot
403
404        let ranges = map.slots_for_node(node);
405        assert_eq!(ranges.len(), 3);
406        assert_eq!(ranges[0], SlotRange::new(0, 10));
407        assert_eq!(ranges[1], SlotRange::new(100, 110));
408        assert_eq!(ranges[2], SlotRange::new(200, 200));
409    }
410}