redis_protocol/
utils.rs

1use crate::nom_bytes::NomBytes;
2use crate::resp2::types::Frame as Resp2Frame;
3use crate::resp3::types::Frame as Resp3Frame;
4use crate::types::*;
5use bytes::BytesMut;
6use bytes_utils::Str;
7use cookie_factory::GenError;
8use crc16::{State, XMODEM};
9use nom::error::ErrorKind as NomErrorKind;
10use alloc::borrow::ToOwned;
11use alloc::vec::Vec;
12use core::str;
13
14pub const KB: usize = 1024;
15/// A pre-defined zeroed out KB of data, used to speed up extending buffers while encoding.
16pub const ZEROED_KB: &'static [u8; 1024] = &[0; 1024];
17
18pub const REDIS_CLUSTER_SLOTS: u16 = 16384;
19
20/// Prefix on normal pubsub messages.
21///
22/// In RESP2 this may be the first inner frame, in RESP3 it may be the second inner frame.
23pub const PUBSUB_PREFIX: &'static str = "message";
24/// Prefix on pubsub messages from a pattern matching subscription.
25///
26/// In RESP2 this may be the first inner frame, in RESP3 it may be the second inner frame.
27pub const PATTERN_PUBSUB_PREFIX: &'static str = "pmessage";
28/// Prefix on RESP3 push pubsub messages.
29///
30/// In RESP3 this is the first inner frame in a push pubsub message.
31pub const PUBSUB_PUSH_PREFIX: &'static str = "pubsub";
32
33macro_rules! unwrap_return(
34  ($expr:expr) => {
35    match $expr {
36      Some(val) => val,
37      None => return None,
38    }
39  };
40);
41
42macro_rules! encode_checks(
43  ($x:ident, $required:expr) => {
44    let _ = crate::utils::check_offset(&$x)?;
45    let required = $required;
46    let remaining = $x.0.len() - $x.1;
47
48    if remaining < required {
49      return Err(cookie_factory::GenError::BufferTooSmall(required - remaining));
50    }
51  }
52);
53
54macro_rules! e (
55  ($err:expr) => {
56    return Err(RedisParseError::from($err).into_nom_error())
57  }
58);
59
60macro_rules! etry (
61  ($expr:expr) => {
62    match $expr {
63      Ok(result) => result,
64      Err(e) => return Err(RedisParseError::from(e).into_nom_error())
65    }
66  }
67);
68
69#[cfg(feature = "decode-logs")]
70macro_rules! decode_log(
71  ($buf:ident, $($arg:tt)*) => (
72    if log_enabled!(log::Level::Trace) {
73      if let Some(s) = std::str::from_utf8(&$buf).ok() {
74        let $buf = s;
75        trace!($($arg)*)
76      }else{
77        trace!($($arg)*)
78      }
79    }
80  );
81  ($buf:expr, $name:ident, $($arg:tt)*) => (
82    if log_enabled!(log::Level::Trace) {
83      if let Some(s) = std::str::from_utf8(&$buf).ok() {
84        let $name = s;
85        trace!($($arg)*)
86      }else{
87        trace!($($arg)*)
88      }
89    }
90  );
91  ($($arg:tt)*) => (
92    if log_enabled!(log::Level::Trace) {
93      trace!($($arg)*)
94    }
95  );
96);
97
98#[cfg(not(feature = "decode-logs"))]
99macro_rules! decode_log(
100  ($buf:ident, $($arg:tt)*) => ();
101  ($($arg:tt)*) => ();
102);
103
104/// Utility function to translate RESP2 frames to RESP3 frames.
105///
106/// RESP2 frames and RESP3 frames are quite different, but RESP3 is largely a superset of RESP2 so this function will never return an error.
107///
108/// Redis handles the protocol choice based on the response to the `HELLO` command, so developers of higher level clients can be faced with a decision on which `Frame`
109/// struct to expose to callers.
110///
111/// This function can allow callers to take a dependency on the RESP3 interface by lazily translating RESP2 frames from the server to RESP3 frames.
112pub fn resp2_frame_to_resp3(frame: Resp2Frame) -> Resp3Frame {
113  if frame.is_normal_pubsub() {
114    let mut out = Vec::with_capacity(4);
115    out.push(Resp3Frame::SimpleString {
116      data: PUBSUB_PUSH_PREFIX.into(),
117      attributes: None,
118    });
119    out.push(Resp3Frame::SimpleString {
120      data: PUBSUB_PREFIX.into(),
121      attributes: None,
122    });
123    if let Resp2Frame::Array(mut inner) = frame {
124      // unwrap checked in is_normal_pubsub
125      let message = inner.pop().unwrap();
126      let channel = inner.pop().unwrap();
127      out.push(resp2_frame_to_resp3(channel));
128      out.push(resp2_frame_to_resp3(message));
129    } else {
130      panic!("Invalid pubsub frame conversion to resp3. This is a bug.");
131    }
132
133    return Resp3Frame::Push {
134      data: out,
135      attributes: None,
136    };
137  }
138  if frame.is_pattern_pubsub_message() {
139    let mut out = Vec::with_capacity(4);
140    out.push(Resp3Frame::SimpleString {
141      data: PUBSUB_PUSH_PREFIX.into(),
142      attributes: None,
143    });
144    out.push(Resp3Frame::SimpleString {
145      data: PATTERN_PUBSUB_PREFIX.into(),
146      attributes: None,
147    });
148    if let Resp2Frame::Array(mut inner) = frame {
149      // unwrap checked in is_normal_pubsub
150      let message = inner.pop().unwrap();
151      let channel = inner.pop().unwrap();
152      out.push(resp2_frame_to_resp3(channel));
153      out.push(resp2_frame_to_resp3(message));
154    } else {
155      panic!("Invalid pattern pubsub frame conversion to resp3. This is a bug.");
156    }
157
158    return Resp3Frame::Push {
159      data: out,
160      attributes: None,
161    };
162  }
163
164  match frame {
165    Resp2Frame::Integer(i) => Resp3Frame::Number {
166      data: i,
167      attributes: None,
168    },
169    Resp2Frame::Error(s) => Resp3Frame::SimpleError {
170      data: s,
171      attributes: None,
172    },
173    Resp2Frame::BulkString(d) => {
174      if d.len() < 6 {
175        match str::from_utf8(&d).ok() {
176          Some(s) => match s.as_ref() {
177            "true" => Resp3Frame::Boolean {
178              data: true,
179              attributes: None,
180            },
181            "false" => Resp3Frame::Boolean {
182              data: false,
183              attributes: None,
184            },
185            _ => Resp3Frame::BlobString {
186              data: d,
187              attributes: None,
188            },
189          },
190          None => Resp3Frame::BlobString {
191            data: d,
192            attributes: None,
193          },
194        }
195      } else {
196        Resp3Frame::BlobString {
197          data: d,
198          attributes: None,
199        }
200      }
201    },
202    Resp2Frame::SimpleString(s) => Resp3Frame::SimpleString {
203      data: s,
204      attributes: None,
205    },
206    Resp2Frame::Null => Resp3Frame::Null,
207    Resp2Frame::Array(data) => {
208      let mut out = Vec::with_capacity(data.len());
209      for frame in data.into_iter() {
210        out.push(resp2_frame_to_resp3(frame));
211      }
212      Resp3Frame::Array {
213        data: out,
214        attributes: None,
215      }
216    },
217  }
218}
219
220pub fn check_offset(x: &(&mut [u8], usize)) -> Result<(), GenError> {
221  if x.1 > x.0.len() {
222    error!("Invalid offset of {} with buf len {}", x.1, x.0.len());
223    Err(GenError::InvalidOffset)
224  } else {
225    Ok(())
226  }
227}
228
229/// Returns the number of bytes necessary to encode a string representation of `d`.
230#[cfg(feature = "std")]
231pub fn digits_in_number(d: usize) -> usize {
232  if d == 0 {
233    return 1;
234  }
235
236  ((d as f64).log10()).floor() as usize + 1
237}
238
239#[cfg(feature = "libm")]
240pub fn digits_in_number(d: usize) -> usize {
241  if d == 0 {
242    return 1;
243  }
244
245  libm::floor(libm::log10(d as f64)) as usize + 1
246}
247
248// this is faster than repeat(0).take(amt) at the cost of some memory
249pub fn zero_extend(buf: &mut BytesMut, mut amt: usize) {
250  trace!("allocating more, len: {}, amt: {}", buf.len(), amt);
251
252  buf.reserve(amt);
253  while amt >= KB {
254    buf.extend_from_slice(ZEROED_KB);
255    amt -= KB;
256  }
257  if amt > 0 {
258    buf.extend_from_slice(&ZEROED_KB[0..amt]);
259  }
260}
261
262pub fn is_cluster_error(payload: &str) -> bool {
263  if payload.starts_with("MOVED") || payload.starts_with("ASK") {
264    payload.split(" ").fold(0, |c, _| c + 1) == 3
265  } else {
266    false
267  }
268}
269
270pub fn read_cluster_error(payload: &str) -> Option<Redirection> {
271  if payload.starts_with("MOVED") {
272    let parts: Vec<&str> = payload.split(" ").collect();
273    if parts.len() == 3 {
274      let slot = unwrap_return!(parts[1].parse::<u16>().ok());
275      let server = parts[2].to_owned();
276
277      Some(Redirection::Moved { slot, server })
278    } else {
279      None
280    }
281  } else if payload.starts_with("ASK") {
282    let parts: Vec<&str> = payload.split(" ").collect();
283    if parts.len() == 3 {
284      let slot = unwrap_return!(parts[1].parse::<u16>().ok());
285      let server = parts[2].to_owned();
286
287      Some(Redirection::Ask { slot, server })
288    } else {
289      None
290    }
291  } else {
292    None
293  }
294}
295
296/// Perform a crc16 XMODEM operation against a string slice.
297fn crc16_xmodem(key: &[u8]) -> u16 {
298  State::<XMODEM>::calculate(key) % REDIS_CLUSTER_SLOTS
299}
300
301/// Map a Redis key to its cluster key slot.
302///
303/// ```ignore
304/// $ redis-cli -p 30001 cluster keyslot "8xjx7vWrfPq54mKfFD3Y1CcjjofpnAcQ"
305/// (integer) 5458
306/// ```
307///
308/// ```no_run
309/// # use redis_protocol::redis_keyslot;
310/// assert_eq!(redis_keyslot("8xjx7vWrfPq54mKfFD3Y1CcjjofpnAcQ".as_bytes()), 5458);
311/// ```
312pub fn redis_keyslot(key: &[u8]) -> u16 {
313  let (mut i, mut j): (Option<usize>, Option<usize>) = (None, None);
314
315  for (idx, c) in key.iter().enumerate() {
316    if *c == b'{' {
317      i = Some(idx);
318      break;
319    }
320  }
321
322  if i.is_none() || (i.is_some() && i.unwrap() == key.len() - 1) {
323    return crc16_xmodem(key);
324  }
325
326  let i = i.unwrap();
327  for (idx, c) in key[i + 1..].iter().enumerate() {
328    if *c == b'}' {
329      j = Some(idx);
330      break;
331    }
332  }
333
334  if j.is_none() {
335    return crc16_xmodem(key);
336  }
337
338  let j = j.unwrap();
339  let out = if i + j == key.len() || j == 0 {
340    crc16_xmodem(key)
341  } else {
342    crc16_xmodem(&key[i + 1..i + j + 1])
343  };
344
345  out
346}
347
348pub(crate) fn to_byte_str<T>(data: T) -> Result<Str, RedisParseError<NomBytes>>
349where
350  T: AsRef<NomBytes>,
351{
352  let data_ref = data.as_ref();
353  Str::from_inner(data_ref.clone().into_bytes()).map_err(|_| RedisParseError::Nom(data_ref.clone(), NomErrorKind::Char))
354}
355
356#[cfg(test)]
357mod tests {
358  use super::*;
359
360  fn read_kitten_file() -> Vec<u8> {
361    include_bytes!("../tests/kitten.jpeg").to_vec()
362  }
363
364  #[test]
365  fn should_crc16_123456789() {
366    let key = "123456789";
367    // 31C3
368    let expected: u16 = 12739;
369    let actual = redis_keyslot(key.as_bytes());
370
371    assert_eq!(actual, expected);
372  }
373
374  #[test]
375  fn should_crc16_with_brackets() {
376    let key = "foo{123456789}bar";
377    // 31C3
378    let expected: u16 = 12739;
379    let actual = redis_keyslot(key.as_bytes());
380
381    assert_eq!(actual, expected);
382  }
383
384  #[test]
385  fn should_crc16_with_brackets_no_padding() {
386    let key = "{123456789}";
387    // 31C3
388    let expected: u16 = 12739;
389    let actual = redis_keyslot(key.as_bytes());
390
391    assert_eq!(actual, expected);
392  }
393
394  #[test]
395  fn should_crc16_with_invalid_brackets_lhs() {
396    let key = "foo{123456789";
397    // 288A
398    let expected: u16 = 10378;
399    let actual = redis_keyslot(key.as_bytes());
400
401    assert_eq!(actual, expected);
402  }
403
404  #[test]
405  fn should_crc16_with_invalid_brackets_rhs() {
406    let key = "foo}123456789";
407    // 5B35 = 23349, 23349 % 16384 = 6965
408    let expected: u16 = 6965;
409    let actual = redis_keyslot(key.as_bytes());
410
411    assert_eq!(actual, expected);
412  }
413
414  #[test]
415  fn should_crc16_with_random_string() {
416    let key = "8xjx7vWrfPq54mKfFD3Y1CcjjofpnAcQ";
417    // 127.0.0.1:30001> cluster keyslot 8xjx7vWrfPq54mKfFD3Y1CcjjofpnAcQ
418    // (integer) 5458
419    let expected: u16 = 5458;
420    let actual = redis_keyslot(key.as_bytes());
421
422    assert_eq!(actual, expected);
423  }
424
425  #[test]
426  fn should_hash_non_ascii_string_bytes() {
427    let key = "💩 👻 💀 ☠️ 👽 👾";
428    // 127.0.0.1:30001> cluster keyslot "💩 👻 💀 ☠️ 👽 👾"
429    // (integer) 13954
430    let expected: u16 = 13954;
431    let actual = redis_keyslot(key.as_bytes());
432
433    assert_eq!(actual, expected);
434  }
435
436  #[test]
437  fn should_hash_non_ascii_string_bytes_with_tag() {
438    let key = "💩 👻 💀{123456789}☠️ 👽 👾";
439    // 127.0.0.1:30001> cluster keyslot "💩 👻 💀{123456789}☠️ 👽 👾"
440    // (integer) 12739
441    let expected: u16 = 12739;
442    let actual = redis_keyslot(key.as_bytes());
443
444    assert_eq!(actual, expected);
445  }
446
447  #[test]
448  fn should_hash_non_utf8_string_bytes() {
449    let key = read_kitten_file();
450    let expected: u16 = 1589;
451    let actual = redis_keyslot(&key);
452
453    assert_eq!(actual, expected)
454  }
455
456  #[test]
457  fn should_hash_non_utf8_string_bytes_with_tag() {
458    let mut key = read_kitten_file();
459    for (idx, c) in "{123456789}".as_bytes().iter().enumerate() {
460      key[242 + idx] = *c;
461    }
462
463    let expected: u16 = 12739;
464    let actual = redis_keyslot(&key);
465    assert_eq!(actual, expected)
466  }
467}