1use crate::NodeId;
7
8pub const SLOT_COUNT: u16 = 16384;
10
11#[rustfmt::skip]
14static CRC16_TABLE: [u16; 256] = [
15 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
16 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
17 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
18 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
19 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
20 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
21 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
22 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
23 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
24 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
25 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
26 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
27 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
28 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
29 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
30 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
31 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
32 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
33 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
34 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
35 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
36 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
37 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
38 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
39 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
40 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
41 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
42 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
43 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
44 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
45 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
46 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
47];
48
49fn crc16(data: &[u8]) -> u16 {
53 let mut crc: u16 = 0;
54 for &byte in data {
55 let idx = ((crc >> 8) ^ (byte as u16)) as usize;
56 crc = (crc << 8) ^ CRC16_TABLE[idx];
57 }
58 crc
59}
60
61fn extract_hash_tag(key: &[u8]) -> &[u8] {
73 let Some(open) = key.iter().position(|&b| b == b'{') else {
75 return key;
76 };
77
78 let after_open = &key[open + 1..];
80 let Some(close) = after_open.iter().position(|&b| b == b'}') else {
81 return key;
82 };
83
84 if close == 0 {
86 return key;
87 }
88
89 &after_open[..close]
90}
91
92pub fn key_slot(key: &[u8]) -> u16 {
96 let hash_input = extract_hash_tag(key);
97 crc16(hash_input) % SLOT_COUNT
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
107pub struct SlotRange {
108 pub start: u16,
109 pub end: u16, }
111
112impl SlotRange {
113 pub fn new(start: u16, end: u16) -> Self {
119 assert!(start <= end, "SlotRange requires start <= end");
120 assert!(end < SLOT_COUNT, "slot must be < {SLOT_COUNT}");
121 Self { start, end }
122 }
123
124 pub fn try_new(start: u16, end: u16) -> Result<Self, std::io::Error> {
129 if start > end {
130 return Err(std::io::Error::new(
131 std::io::ErrorKind::InvalidData,
132 format!("SlotRange requires start <= end, got {start}..{end}"),
133 ));
134 }
135 if end >= SLOT_COUNT {
136 return Err(std::io::Error::new(
137 std::io::ErrorKind::InvalidData,
138 format!("slot {end} out of range (max {})", SLOT_COUNT - 1),
139 ));
140 }
141 Ok(Self { start, end })
142 }
143
144 pub fn single(slot: u16) -> Self {
146 Self::new(slot, slot)
147 }
148
149 #[allow(clippy::len_without_is_empty)]
151 pub fn len(&self) -> u16 {
152 self.end - self.start + 1
153 }
154
155 pub fn contains(&self, slot: u16) -> bool {
157 slot >= self.start && slot <= self.end
158 }
159
160 pub fn iter(&self) -> impl Iterator<Item = u16> {
162 self.start..=self.end
163 }
164}
165
166impl std::fmt::Display for SlotRange {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 if self.start == self.end {
169 write!(f, "{}", self.start)
170 } else {
171 write!(f, "{}-{}", self.start, self.end)
172 }
173 }
174}
175
176#[derive(Debug, Clone)]
181pub struct SlotMap {
182 slots: Box<[Option<NodeId>; SLOT_COUNT as usize]>,
183}
184
185impl Default for SlotMap {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191impl SlotMap {
192 pub fn new() -> Self {
194 Self {
195 slots: Box::new([None; SLOT_COUNT as usize]),
197 }
198 }
199
200 pub fn single_node(node: NodeId) -> Self {
204 let mut map = Self::new();
205 for slot in map.slots.iter_mut() {
206 *slot = Some(node);
207 }
208 map
209 }
210
211 pub fn owner(&self, slot: u16) -> Option<NodeId> {
213 self.slots.get(slot as usize).copied().flatten()
214 }
215
216 pub fn assign(&mut self, slot: u16, node: NodeId) {
218 if let Some(entry) = self.slots.get_mut(slot as usize) {
219 *entry = Some(node);
220 }
221 }
222
223 pub fn assign_range(&mut self, range: SlotRange, node: NodeId) {
225 for slot in range.iter() {
226 self.assign(slot, node);
227 }
228 }
229
230 pub fn unassign(&mut self, slot: u16) {
232 if let Some(entry) = self.slots.get_mut(slot as usize) {
233 *entry = None;
234 }
235 }
236
237 pub fn is_complete(&self) -> bool {
239 self.slots.iter().all(|s| s.is_some())
240 }
241
242 pub fn unassigned_count(&self) -> usize {
244 self.slots.iter().filter(|s| s.is_none()).count()
245 }
246
247 pub fn slots_for_node(&self, node: NodeId) -> Vec<SlotRange> {
251 let mut ranges = Vec::new();
252 let mut range_start: Option<u16> = None;
253 let mut prev_slot: Option<u16> = None;
254
255 for (slot_idx, owner) in self.slots.iter().enumerate() {
256 let slot = slot_idx as u16;
257 let owned = *owner == Some(node);
258
259 match (owned, range_start) {
260 (true, None) => {
261 range_start = Some(slot);
263 prev_slot = Some(slot);
264 }
265 (true, Some(_)) => {
266 prev_slot = Some(slot);
268 }
269 (false, Some(start)) => {
270 if let Some(end) = prev_slot {
272 ranges.push(SlotRange::new(start, end));
273 }
274 range_start = None;
275 prev_slot = None;
276 }
277 (false, None) => {
278 }
280 }
281 }
282
283 if let (Some(start), Some(end)) = (range_start, prev_slot) {
285 ranges.push(SlotRange::new(start, end));
286 }
287
288 ranges
289 }
290
291 pub fn slot_counts(&self) -> std::collections::HashMap<NodeId, usize> {
293 let mut counts = std::collections::HashMap::new();
294 for owner in self.slots.iter().flatten() {
295 *counts.entry(*owner).or_insert(0) += 1;
296 }
297 counts
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304 use uuid::Uuid;
305
306 #[test]
308 fn crc16_matches_redis() {
309 assert_eq!(key_slot(b""), 0);
311 assert_eq!(key_slot(b"foo"), 12182);
312 assert_eq!(key_slot(b"bar"), 5061);
313 assert_eq!(key_slot(b"hello"), 866);
314 assert_eq!(key_slot(b"123456789"), 12739);
316 }
317
318 #[test]
319 fn hash_tag_extraction() {
320 assert_eq!(key_slot(b"user:{123}:profile"), key_slot(b"123"));
322 assert_eq!(key_slot(b"order:{123}:items"), key_slot(b"123"));
323
324 assert_eq!(key_slot(b"{user}:123"), key_slot(b"user"));
326
327 assert_eq!(key_slot(b"foo{}bar"), key_slot(b"foo{}bar"));
329
330 assert_eq!(key_slot(b"foo{bar"), key_slot(b"foo{bar"));
332
333 assert_eq!(key_slot(b"{a}{b}"), key_slot(b"a"));
335 }
336
337 #[test]
338 fn slot_range_basics() {
339 let range = SlotRange::new(0, 5460);
340 assert_eq!(range.len(), 5461);
341 assert!(range.contains(0));
342 assert!(range.contains(5460));
343 assert!(!range.contains(5461));
344
345 let single = SlotRange::single(100);
346 assert_eq!(single.len(), 1);
347 assert!(single.contains(100));
348 assert!(!single.contains(99));
349 assert!(!single.contains(101));
350 }
351
352 #[test]
353 fn slot_range_display() {
354 assert_eq!(SlotRange::new(0, 5460).to_string(), "0-5460");
355 assert_eq!(SlotRange::single(100).to_string(), "100");
356 }
357
358 #[test]
359 fn slot_map_single_node() {
360 let node = NodeId(Uuid::new_v4());
361 let map = SlotMap::single_node(node);
362
363 assert!(map.is_complete());
364 assert_eq!(map.unassigned_count(), 0);
365 assert_eq!(map.owner(0), Some(node));
366 assert_eq!(map.owner(SLOT_COUNT - 1), Some(node));
367
368 let ranges = map.slots_for_node(node);
369 assert_eq!(ranges.len(), 1);
370 assert_eq!(ranges[0], SlotRange::new(0, SLOT_COUNT - 1));
371 }
372
373 #[test]
374 fn slot_map_multi_node() {
375 let node1 = NodeId(Uuid::new_v4());
376 let node2 = NodeId(Uuid::new_v4());
377 let node3 = NodeId(Uuid::new_v4());
378
379 let mut map = SlotMap::new();
380 assert!(!map.is_complete());
381 assert_eq!(map.unassigned_count(), SLOT_COUNT as usize);
382
383 map.assign_range(SlotRange::new(0, 5460), node1);
385 map.assign_range(SlotRange::new(5461, 10922), node2);
386 map.assign_range(SlotRange::new(10923, 16383), node3);
387
388 assert!(map.is_complete());
389
390 assert_eq!(map.owner(0), Some(node1));
391 assert_eq!(map.owner(5460), Some(node1));
392 assert_eq!(map.owner(5461), Some(node2));
393 assert_eq!(map.owner(10922), Some(node2));
394 assert_eq!(map.owner(10923), Some(node3));
395 assert_eq!(map.owner(16383), Some(node3));
396
397 let counts = map.slot_counts();
398 assert_eq!(counts.get(&node1), Some(&5461));
399 assert_eq!(counts.get(&node2), Some(&5462));
400 assert_eq!(counts.get(&node3), Some(&5461));
401 }
402
403 #[test]
404 fn slot_map_unassign() {
405 let node = NodeId(Uuid::new_v4());
406 let mut map = SlotMap::single_node(node);
407
408 map.unassign(100);
409 assert_eq!(map.owner(100), None);
410 assert!(!map.is_complete());
411 assert_eq!(map.unassigned_count(), 1);
412 }
413
414 #[test]
415 fn slot_range_try_new_validates() {
416 assert!(SlotRange::try_new(0, 5460).is_ok());
417 assert!(SlotRange::try_new(100, 100).is_ok());
418 assert!(SlotRange::try_new(5000, 100).is_err());
420 assert!(SlotRange::try_new(0, 16384).is_err());
422 assert!(SlotRange::try_new(0, u16::MAX).is_err());
423 }
424
425 #[test]
426 fn slots_for_node_ranges() {
427 let node = NodeId(Uuid::new_v4());
428 let mut map = SlotMap::new();
429
430 map.assign_range(SlotRange::new(0, 10), node);
432 map.assign_range(SlotRange::new(100, 110), node);
433 map.assign_range(SlotRange::new(200, 200), node); let ranges = map.slots_for_node(node);
436 assert_eq!(ranges.len(), 3);
437 assert_eq!(ranges[0], SlotRange::new(0, 10));
438 assert_eq!(ranges[1], SlotRange::new(100, 110));
439 assert_eq!(ranges[2], SlotRange::new(200, 200));
440 }
441}