Skip to main content

dynomite/proto/redis/
parser.rs

1//! Redis (RESP) wire-protocol parser.
2//!
3//! The parser walks the bytes of a flattened request or response
4//! buffer through a single state machine and updates the message
5//! in place. The state alphabet, transitions, and error paths
6//! mirror `redis_parse_req` and `redis_parse_rsp` in the reference
7//! engine.
8//!
9//! The parser is byte-driven and never allocates outside the
10//! [`Msg`]'s key and argument buffers. It must not panic on any
11//! input.
12
13// The parser truncates ASCII-decimal accumulators into the same
14// fixed-width counters the reference engine uses (`uint32_t` for
15// rlen / rntokens / nkeys, `usize` for cursor positions). The
16// allowance keeps the Rust port faithful to the C casts; the
17// out-of-range numerals surface as parse errors elsewhere in the
18// state machine.
19#![allow(clippy::cast_possible_truncation)]
20#![allow(clippy::cast_possible_wrap)]
21#![allow(clippy::too_many_arguments)]
22#![allow(clippy::match_same_arms)]
23#![allow(clippy::needless_continue)]
24#![allow(clippy::manual_let_else)]
25
26use super::commands::{classify, error_lookup, lookup, CommandClass, RoutingOverride};
27use crate::io::mbuf::MBUF_MAX_SIZE;
28use crate::msg::{ArgPos, KeyPos, Msg, MsgParseResult, MsgRouting, MsgType};
29
30const CR: u8 = b'\r';
31const LF: u8 = b'\n';
32
33/// Optional hash-tag delimiters. When set the parser carves out
34/// the inner range as the routing tag on every parsed key.
35#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
36pub struct HashTag {
37    /// Opening byte of the hash tag.
38    pub open: u8,
39    /// Closing byte of the hash tag.
40    pub close: u8,
41}
42
43#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
44#[repr(u32)]
45enum ReqState {
46    #[default]
47    Start = 0,
48    Narg = 1,
49    NargLf = 2,
50    ReqTypeLen = 3,
51    ReqTypeLenLf = 4,
52    ReqType = 5,
53    ReqTypeLf = 6,
54    KeyLen = 7,
55    KeyLenLf = 8,
56    Key = 9,
57    KeyLf = 10,
58    Arg1Len = 11,
59    Arg1LenLf = 12,
60    Arg1 = 13,
61    Arg1Lf = 14,
62    Arg2Len = 15,
63    Arg2LenLf = 16,
64    Arg2 = 17,
65    Arg2Lf = 18,
66    Arg3Len = 19,
67    Arg3LenLf = 20,
68    Arg3 = 21,
69    Arg3Lf = 22,
70    ArgnLen = 23,
71    ArgnLenLf = 24,
72    Argn = 25,
73    ArgnLf = 26,
74    InlinePing = 27,
75}
76
77impl ReqState {
78    fn from_u32(v: u32) -> Self {
79        match v {
80            1 => Self::Narg,
81            2 => Self::NargLf,
82            3 => Self::ReqTypeLen,
83            4 => Self::ReqTypeLenLf,
84            5 => Self::ReqType,
85            6 => Self::ReqTypeLf,
86            7 => Self::KeyLen,
87            8 => Self::KeyLenLf,
88            9 => Self::Key,
89            10 => Self::KeyLf,
90            11 => Self::Arg1Len,
91            12 => Self::Arg1LenLf,
92            13 => Self::Arg1,
93            14 => Self::Arg1Lf,
94            15 => Self::Arg2Len,
95            16 => Self::Arg2LenLf,
96            17 => Self::Arg2,
97            18 => Self::Arg2Lf,
98            19 => Self::Arg3Len,
99            20 => Self::Arg3LenLf,
100            21 => Self::Arg3,
101            22 => Self::Arg3Lf,
102            23 => Self::ArgnLen,
103            24 => Self::ArgnLenLf,
104            25 => Self::Argn,
105            26 => Self::ArgnLf,
106            27 => Self::InlinePing,
107            _ => Self::Start,
108        }
109    }
110}
111
112#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
113#[repr(u32)]
114enum RspState {
115    #[default]
116    Start = 0,
117    Status = 1,
118    Error = 2,
119    Integer = 3,
120    IntegerStart = 4,
121    Simple = 5,
122    Bulk = 6,
123    BulkLf = 7,
124    BulkArg = 8,
125    BulkArgLf = 9,
126    Multibulk = 10,
127    MultibulkNargLf = 11,
128    MultibulkArgnLen = 12,
129    MultibulkArgnLenLf = 13,
130    MultibulkArgn = 14,
131    MultibulkArgnLf = 15,
132    RuntoCrlf = 16,
133    AlmostDone = 17,
134}
135
136impl RspState {
137    fn from_u32(v: u32) -> Self {
138        match v {
139            1 => Self::Status,
140            2 => Self::Error,
141            3 => Self::Integer,
142            4 => Self::IntegerStart,
143            5 => Self::Simple,
144            6 => Self::Bulk,
145            7 => Self::BulkLf,
146            8 => Self::BulkArg,
147            9 => Self::BulkArgLf,
148            10 => Self::Multibulk,
149            11 => Self::MultibulkNargLf,
150            12 => Self::MultibulkArgnLen,
151            13 => Self::MultibulkArgnLenLf,
152            14 => Self::MultibulkArgn,
153            15 => Self::MultibulkArgnLf,
154            16 => Self::RuntoCrlf,
155            17 => Self::AlmostDone,
156            _ => Self::Start,
157        }
158    }
159}
160
161/// Parse a Redis request from `input` and update `r` in place.
162///
163/// On success `r.ty()` is set to the recognised command, the
164/// parsed keys are appended to [`Msg::keys`], the argument buffer
165/// is left untouched (callers that need argument bytes use
166/// [`redis_parse_req_with_args`]), and the parser cursor advances
167/// just past the trailing LF.
168///
169/// # Examples
170///
171/// ```
172/// use dynomite::msg::{Msg, MsgParseResult, MsgType};
173/// use dynomite::proto::redis::redis_parse_req;
174///
175/// let mut r = Msg::new(0, MsgType::Unknown, true);
176/// let res = redis_parse_req(&mut r, b"*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n");
177/// assert_eq!(res, MsgParseResult::Ok);
178/// assert_eq!(r.ty(), MsgType::ReqRedisGet);
179/// assert_eq!(r.keys()[0].key(), b"foo");
180/// ```
181pub fn redis_parse_req(r: &mut Msg, input: &[u8]) -> MsgParseResult {
182    redis_parse_req_with_args(r, input, None, true)
183}
184
185/// Variant of [`redis_parse_req`] that records all bulk arguments
186/// (beyond the keys) into [`Msg::args`]. Used by the rewrite path
187/// that needs each argument's bytes.
188///
189/// # Examples
190///
191/// ```
192/// use dynomite::msg::{Msg, MsgParseResult, MsgType};
193/// use dynomite::proto::redis::parser::redis_parse_req_with_args;
194///
195/// let mut r = Msg::new(0, MsgType::Unknown, true);
196/// let bytes = b"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n";
197/// let res = redis_parse_req_with_args(&mut r, bytes, None, true);
198/// assert_eq!(res, MsgParseResult::Ok);
199/// assert_eq!(r.args()[0].bytes(), b"bar");
200/// ```
201#[allow(clippy::too_many_lines)]
202pub fn redis_parse_req_with_args(
203    r: &mut Msg,
204    input: &[u8],
205    hash_tag: Option<HashTag>,
206    record_args: bool,
207) -> MsgParseResult {
208    if !r.is_request() {
209        r.set_parse_result(MsgParseResult::Error);
210        return MsgParseResult::Error;
211    }
212    let mut state = ReqState::from_u32(r.parser_state());
213    let mut p = r.parser_pos();
214    let mut token: Option<usize> = r.parser_token();
215    let mut rlen = r.rlen();
216    let mut rntokens = r.rntokens();
217    let mut ntokens = r.ntokens();
218    let mut nkeys = r.nkeys();
219    let mut ty = r.ty();
220    let mut is_read = r.flags().is_read;
221    let mut quit = r.flags().quit;
222    let mut routing = r.routing();
223
224    while p < input.len() {
225        let ch = input[p];
226        match state {
227            ReqState::Start | ReqState::Narg => {
228                if token.is_none() {
229                    if ch == b'p' || ch == b'P' {
230                        state = ReqState::InlinePing;
231                        p += 1;
232                        continue;
233                    }
234                    if ch != b'*' {
235                        return finish_req_error(
236                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
237                            routing,
238                        );
239                    }
240                    token = Some(p);
241                    rntokens = 0;
242                    state = ReqState::Narg;
243                    p += 1;
244                } else if ch.is_ascii_digit() {
245                    rntokens = rntokens
246                        .saturating_mul(10)
247                        .saturating_add(u32::from(ch - b'0'));
248                    p += 1;
249                } else if ch == CR {
250                    if rntokens == 0 {
251                        return finish_req_error(
252                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
253                            routing,
254                        );
255                    }
256                    ntokens = rntokens;
257                    token = None;
258                    state = ReqState::NargLf;
259                    p += 1;
260                } else {
261                    return finish_req_error(
262                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
263                        routing,
264                    );
265                }
266            }
267            ReqState::InlinePing => {
268                // `pInG` then trailing CRLF (4 more bytes).
269                if input.len() - p >= 5
270                    && input[p].eq_ignore_ascii_case(&b'i')
271                    && input[p + 1].eq_ignore_ascii_case(&b'n')
272                    && input[p + 2].eq_ignore_ascii_case(&b'g')
273                    && input[p + 3] == CR
274                    && input[p + 4] == LF
275                {
276                    ty = MsgType::ReqRedisPing;
277                    is_read = true;
278                    routing = MsgRouting::LocalNodeOnly;
279                    p += 5;
280                    return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
281                }
282                return finish_req_error(
283                    r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit, routing,
284                );
285            }
286            ReqState::NargLf => {
287                if ch == LF {
288                    state = ReqState::ReqTypeLen;
289                    p += 1;
290                } else {
291                    return finish_req_error(
292                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
293                        routing,
294                    );
295                }
296            }
297            ReqState::ReqTypeLen => {
298                if token.is_none() {
299                    if ch != b'$' {
300                        return finish_req_error(
301                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
302                            routing,
303                        );
304                    }
305                    token = Some(p);
306                    rlen = 0;
307                    p += 1;
308                } else if ch.is_ascii_digit() {
309                    rlen = rlen.saturating_mul(10).saturating_add(u32::from(ch - b'0'));
310                    p += 1;
311                } else if ch == CR {
312                    if rlen == 0 || rntokens == 0 {
313                        return finish_req_error(
314                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
315                            routing,
316                        );
317                    }
318                    rntokens -= 1;
319                    token = None;
320                    state = ReqState::ReqTypeLenLf;
321                    p += 1;
322                } else {
323                    return finish_req_error(
324                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
325                        routing,
326                    );
327                }
328            }
329            ReqState::ReqTypeLenLf => {
330                if ch == LF {
331                    state = ReqState::ReqType;
332                    p += 1;
333                } else {
334                    return finish_req_error(
335                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
336                        routing,
337                    );
338                }
339            }
340            ReqState::ReqType => {
341                if token.is_none() {
342                    token = Some(p);
343                }
344                let start = token.expect("token recorded");
345                let needed = start + rlen as usize;
346                if needed >= input.len() {
347                    // Need more bytes.
348                    p = input.len();
349                    break;
350                }
351                if input[needed] != CR {
352                    return finish_req_error(
353                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
354                        routing,
355                    );
356                }
357                let cmd_bytes = &input[start..needed];
358                p = needed + 1;
359                rlen = 0;
360                token = None;
361                let prev_ty = ty;
362                if prev_ty != MsgType::ReqRedisScript {
363                    ty = MsgType::Unknown;
364                }
365                if let Some((found, traits)) = lookup(cmd_bytes) {
366                    ty = found;
367                    is_read = traits.is_read;
368                    quit = traits.quit;
369                    routing = match traits.routing {
370                        RoutingOverride::None => routing,
371                        RoutingOverride::LocalNodeOnly => MsgRouting::LocalNodeOnly,
372                        RoutingOverride::TokenOwnerLocalRackOnly => {
373                            MsgRouting::TokenOwnerLocalRackOnly
374                        }
375                        RoutingOverride::AllNodesAllRacksAllDcs => {
376                            MsgRouting::AllNodesAllRacksAllDcs
377                        }
378                    };
379                    if ty == MsgType::ReqRedisExists && prev_ty == MsgType::ReqRedisScript {
380                        ty = MsgType::ReqRedisScriptExists;
381                        routing = MsgRouting::AllNodesAllRacksAllDcs;
382                        is_read = true;
383                    }
384                    if ty == MsgType::ReqRedisPing {
385                        // The C parser short-circuits the inline form here.
386                        // p was advanced to needed+1 (the LF position).
387                        return finish_req_ok(
388                            r,
389                            p + 1,
390                            ty,
391                            true,
392                            quit,
393                            MsgRouting::LocalNodeOnly,
394                            ntokens,
395                            nkeys,
396                        );
397                    }
398                } else {
399                    return finish_req_error(
400                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
401                        routing,
402                    );
403                }
404                state = ReqState::ReqTypeLf;
405                continue;
406            }
407            ReqState::ReqTypeLf => {
408                if ty == MsgType::ReqRedisScript {
409                    state = ReqState::ReqTypeLen;
410                    p += 1;
411                    continue;
412                }
413                if ty == MsgType::HackSettingConnConsistency {
414                    state = ReqState::Arg1Len;
415                    p += 1;
416                    continue;
417                }
418                if ch != LF {
419                    return finish_req_error(
420                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
421                        routing,
422                    );
423                }
424                p += 1;
425                let class = classify(ty);
426                if matches!(class, CommandClass::Argz) && rntokens == 0 {
427                    return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
428                }
429                if class == CommandClass::ArgUpto1 && rntokens == 0 {
430                    return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
431                }
432                if class == CommandClass::ArgUpto1 && rntokens == 1 {
433                    state = ReqState::Arg1Len;
434                    continue;
435                }
436                if matches!(
437                    ty,
438                    MsgType::ReqRedisScriptLoad | MsgType::ReqRedisScriptExists
439                ) {
440                    state = ReqState::Arg1Len;
441                    continue;
442                }
443                if class == CommandClass::ArgEval {
444                    state = ReqState::Arg1Len;
445                    continue;
446                }
447                state = ReqState::KeyLen;
448            }
449            ReqState::KeyLen => {
450                if token.is_none() {
451                    if ch != b'$' {
452                        return finish_req_error(
453                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
454                            routing,
455                        );
456                    }
457                    token = Some(p);
458                    rlen = 0;
459                    p += 1;
460                } else if ch.is_ascii_digit() {
461                    rlen = rlen.saturating_mul(10).saturating_add(u32::from(ch - b'0'));
462                    p += 1;
463                } else if ch == CR {
464                    if rlen == 0 || rlen as usize >= MBUF_MAX_SIZE || rntokens == 0 {
465                        return finish_req_error(
466                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
467                            routing,
468                        );
469                    }
470                    rntokens -= 1;
471                    token = None;
472                    state = ReqState::KeyLenLf;
473                    p += 1;
474                } else {
475                    return finish_req_error(
476                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
477                        routing,
478                    );
479                }
480            }
481            ReqState::KeyLenLf => {
482                if ch == LF {
483                    state = ReqState::Key;
484                    p += 1;
485                } else {
486                    return finish_req_error(
487                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
488                        routing,
489                    );
490                }
491            }
492            ReqState::Key => {
493                if token.is_none() {
494                    token = Some(p);
495                }
496                let start = token.expect("token recorded");
497                let needed = start + rlen as usize;
498                if needed >= input.len() {
499                    p = input.len();
500                    break;
501                }
502                if input[needed] != CR {
503                    return finish_req_error(
504                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
505                        routing,
506                    );
507                }
508                let kbytes = input[start..needed].to_vec();
509                p = needed + 1;
510                rlen = 0;
511                let kp = if let Some(tag) = hash_tag {
512                    if let Some(open_idx) = kbytes.iter().position(|&b| b == tag.open) {
513                        if let Some(close_off) =
514                            kbytes[open_idx + 1..].iter().position(|&b| b == tag.close)
515                        {
516                            let s = open_idx + 1;
517                            let e = open_idx + 1 + close_off;
518                            KeyPos::new(kbytes, s..e)
519                        } else {
520                            KeyPos::without_tag(kbytes)
521                        }
522                    } else {
523                        KeyPos::without_tag(kbytes)
524                    }
525                } else {
526                    KeyPos::without_tag(kbytes)
527                };
528                r.push_key(kp);
529                token = None;
530                state = ReqState::KeyLf;
531            }
532            ReqState::KeyLf => {
533                if ch != LF {
534                    return finish_req_error(
535                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
536                        routing,
537                    );
538                }
539                p += 1;
540                let class = classify(ty);
541                match class {
542                    CommandClass::Arg0 => {
543                        if rntokens != 0 {
544                            return finish_req_error(
545                                r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read,
546                                quit, routing,
547                            );
548                        }
549                        return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
550                    }
551                    CommandClass::Arg1 => {
552                        if rntokens != 1 {
553                            return finish_req_error(
554                                r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read,
555                                quit, routing,
556                            );
557                        }
558                        state = ReqState::Arg1Len;
559                    }
560                    CommandClass::Arg2 => {
561                        if rntokens != 2 {
562                            return finish_req_error(
563                                r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read,
564                                quit, routing,
565                            );
566                        }
567                        state = ReqState::Arg1Len;
568                    }
569                    CommandClass::Arg3 => {
570                        if rntokens != 3 {
571                            return finish_req_error(
572                                r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read,
573                                quit, routing,
574                            );
575                        }
576                        state = ReqState::Arg1Len;
577                    }
578                    CommandClass::ArgN => {
579                        if rntokens == 0 {
580                            return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
581                        }
582                        state = ReqState::Arg1Len;
583                    }
584                    CommandClass::ArgX => {
585                        if rntokens == 0 {
586                            return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
587                        }
588                        state = ReqState::KeyLen;
589                    }
590                    CommandClass::ArgKvX => {
591                        if ntokens.is_multiple_of(2) {
592                            return finish_req_error(
593                                r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read,
594                                quit, routing,
595                            );
596                        }
597                        state = ReqState::Arg1Len;
598                    }
599                    CommandClass::ArgEval => {
600                        nkeys = nkeys.saturating_sub(1);
601                        if nkeys > 0 {
602                            state = ReqState::KeyLen;
603                        } else if rntokens > 0 {
604                            state = ReqState::ArgnLen;
605                        } else {
606                            return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
607                        }
608                    }
609                    _ => {
610                        return finish_req_error(
611                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
612                            routing,
613                        );
614                    }
615                }
616            }
617            ReqState::Arg1Len => {
618                match read_bulk_len(input, &mut p, &mut token, &mut rlen, &mut rntokens) {
619                    BulkLenStep::More => {}
620                    BulkLenStep::Done => state = ReqState::Arg1LenLf,
621                    BulkLenStep::Error => {
622                        return finish_req_error(
623                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
624                            routing,
625                        );
626                    }
627                    BulkLenStep::Eof => break,
628                }
629            }
630            ReqState::Arg1LenLf => {
631                if ch == LF {
632                    state = ReqState::Arg1;
633                    p += 1;
634                } else {
635                    return finish_req_error(
636                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
637                        routing,
638                    );
639                }
640            }
641            ReqState::Arg1 => match read_bulk_arg(input, p, rlen, record_args, r) {
642                ArgStep::More => {
643                    p = input.len();
644                    let consumed = (p - r.parser_pos()) as u32;
645                    let _ = consumed;
646                    break;
647                }
648                ArgStep::Done(new_p) => {
649                    p = new_p;
650                    rlen = 0;
651                    state = ReqState::Arg1Lf;
652                }
653                ArgStep::Error => {
654                    return finish_req_error(
655                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
656                        routing,
657                    );
658                }
659            },
660            ReqState::Arg1Lf => {
661                if ch != LF {
662                    return finish_req_error(
663                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
664                        routing,
665                    );
666                }
667                p += 1;
668                let class = classify(ty);
669                match class {
670                    CommandClass::ArgUpto1 | CommandClass::Arg1 => {
671                        if rntokens != 0 {
672                            return finish_req_error(
673                                r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read,
674                                quit, routing,
675                            );
676                        }
677                        return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
678                    }
679                    CommandClass::Arg2 => {
680                        if rntokens != 1 {
681                            return finish_req_error(
682                                r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read,
683                                quit, routing,
684                            );
685                        }
686                        state = ReqState::Arg2Len;
687                    }
688                    CommandClass::Arg3 => {
689                        if rntokens != 2 {
690                            return finish_req_error(
691                                r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read,
692                                quit, routing,
693                            );
694                        }
695                        state = ReqState::Arg2Len;
696                    }
697                    CommandClass::ArgN => {
698                        if rntokens == 0 {
699                            return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
700                        }
701                        state = ReqState::ArgnLen;
702                    }
703                    CommandClass::ArgEval => {
704                        // After parsing the script body we need at
705                        // least the `numkeys` argument to follow.
706                        // Valid Redis EVAL forms include `EVAL
707                        // "script" 0` (no keys, no args) which has
708                        // rntokens == 1 after the script. The
709                        // earlier `< 2` bound rejected that and
710                        // closed the client connection on every
711                        // EVAL-with-no-extra-args invocation.
712                        if rntokens < 1 {
713                            return finish_req_error(
714                                r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read,
715                                quit, routing,
716                            );
717                        }
718                        state = ReqState::Arg2Len;
719                    }
720                    CommandClass::ArgKvX => {
721                        if rntokens == 0 {
722                            return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
723                        }
724                        state = ReqState::KeyLen;
725                    }
726                    _ => {
727                        return finish_req_error(
728                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
729                            routing,
730                        );
731                    }
732                }
733            }
734            ReqState::Arg2Len => {
735                match read_bulk_len(input, &mut p, &mut token, &mut rlen, &mut rntokens) {
736                    BulkLenStep::More => {}
737                    BulkLenStep::Done => state = ReqState::Arg2LenLf,
738                    BulkLenStep::Error => {
739                        return finish_req_error(
740                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
741                            routing,
742                        );
743                    }
744                    BulkLenStep::Eof => break,
745                }
746            }
747            ReqState::Arg2LenLf => {
748                if ch == LF {
749                    state = ReqState::Arg2;
750                    p += 1;
751                } else {
752                    return finish_req_error(
753                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
754                        routing,
755                    );
756                }
757            }
758            ReqState::Arg2 => {
759                let class = classify(ty);
760                let token_start = if class == CommandClass::ArgEval && token.is_none() {
761                    Some(p)
762                } else {
763                    token
764                };
765                token = token_start;
766                match read_bulk_arg(input, p, rlen, record_args, r) {
767                    ArgStep::More => {
768                        p = input.len();
769                        break;
770                    }
771                    ArgStep::Done(new_p) => {
772                        p = new_p;
773                        if class == CommandClass::ArgEval {
774                            // Token holds the start of the integer.
775                            // `read_bulk_arg` returns `needed + 1`,
776                            // so position `p - 1` is the trailing
777                            // `\r`. The digit string is
778                            // `input[start..p - 1]`.
779                            let start = token.unwrap_or(0);
780                            if start >= p.saturating_sub(1) {
781                                return finish_req_error(
782                                    r, state, p, token, rlen, rntokens, ntokens, nkeys, ty,
783                                    is_read, quit, routing,
784                                );
785                            }
786                            let mut nkey: u32 = 0;
787                            for &b in &input[start..p - 1] {
788                                if b.is_ascii_digit() {
789                                    nkey =
790                                        nkey.saturating_mul(10).saturating_add(u32::from(b - b'0'));
791                                } else {
792                                    return finish_req_error(
793                                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty,
794                                        is_read, quit, routing,
795                                    );
796                                }
797                            }
798                            // numkeys may legitimately be 0 (script
799                            // takes no keys). Validate only that
800                            // there are at least `nkey` more bulk
801                            // tokens to follow.
802                            if rntokens < nkey {
803                                return finish_req_error(
804                                    r, state, p, token, rlen, rntokens, ntokens, nkeys, ty,
805                                    is_read, quit, routing,
806                                );
807                            }
808                            nkeys = nkey;
809                            token = None;
810                        }
811                        rlen = 0;
812                        state = ReqState::Arg2Lf;
813                    }
814                    ArgStep::Error => {
815                        return finish_req_error(
816                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
817                            routing,
818                        );
819                    }
820                }
821            }
822            ReqState::Arg2Lf => {
823                if ch != LF {
824                    return finish_req_error(
825                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
826                        routing,
827                    );
828                }
829                p += 1;
830                let class = classify(ty);
831                match class {
832                    CommandClass::Arg2 => {
833                        if rntokens != 0 {
834                            return finish_req_error(
835                                r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read,
836                                quit, routing,
837                            );
838                        }
839                        return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
840                    }
841                    CommandClass::Arg3 => {
842                        if rntokens != 1 {
843                            return finish_req_error(
844                                r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read,
845                                quit, routing,
846                            );
847                        }
848                        state = ReqState::Arg3Len;
849                    }
850                    CommandClass::ArgN => {
851                        if rntokens == 0 {
852                            return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
853                        }
854                        state = ReqState::ArgnLen;
855                    }
856                    CommandClass::ArgEval => {
857                        // After parsing the numkeys arg, the
858                        // remaining tokens are `numkeys` keys
859                        // followed by zero or more args. If
860                        // numkeys=0 AND no more args remain we
861                        // are done with a valid `EVAL "script"
862                        // 0` request.
863                        if rntokens == 0 {
864                            return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
865                        }
866                        // Move to the keys section if numkeys > 0
867                        // OR to the args section if numkeys == 0
868                        // and we still have arg tokens to read.
869                        // The state-transition itself does not
870                        // need to know which case we are in:
871                        // KeyLen reads the next bulk len whether
872                        // it represents a key (recorded with
873                        // routing tag) or an arg (recorded with
874                        // record_args). For EVAL with numkeys=0
875                        // we want the arg path, so route to
876                        // ArgnLen instead.
877                        if nkeys == 0 {
878                            state = ReqState::ArgnLen;
879                        } else {
880                            state = ReqState::KeyLen;
881                        }
882                    }
883                    _ => {
884                        return finish_req_error(
885                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
886                            routing,
887                        );
888                    }
889                }
890            }
891            ReqState::Arg3Len => {
892                match read_bulk_len(input, &mut p, &mut token, &mut rlen, &mut rntokens) {
893                    BulkLenStep::More => {}
894                    BulkLenStep::Done => state = ReqState::Arg3LenLf,
895                    BulkLenStep::Error => {
896                        return finish_req_error(
897                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
898                            routing,
899                        );
900                    }
901                    BulkLenStep::Eof => break,
902                }
903            }
904            ReqState::Arg3LenLf => {
905                if ch == LF {
906                    state = ReqState::Arg3;
907                    p += 1;
908                } else {
909                    return finish_req_error(
910                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
911                        routing,
912                    );
913                }
914            }
915            ReqState::Arg3 => match read_bulk_arg(input, p, rlen, record_args, r) {
916                ArgStep::More => {
917                    p = input.len();
918                    break;
919                }
920                ArgStep::Done(new_p) => {
921                    p = new_p;
922                    rlen = 0;
923                    state = ReqState::Arg3Lf;
924                }
925                ArgStep::Error => {
926                    return finish_req_error(
927                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
928                        routing,
929                    );
930                }
931            },
932            ReqState::Arg3Lf => {
933                if ch != LF {
934                    return finish_req_error(
935                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
936                        routing,
937                    );
938                }
939                p += 1;
940                let class = classify(ty);
941                match class {
942                    CommandClass::Arg3 => {
943                        if rntokens != 0 {
944                            return finish_req_error(
945                                r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read,
946                                quit, routing,
947                            );
948                        }
949                        return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
950                    }
951                    CommandClass::ArgN => {
952                        if rntokens == 0 {
953                            return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
954                        }
955                        state = ReqState::ArgnLen;
956                    }
957                    _ => {
958                        return finish_req_error(
959                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
960                            routing,
961                        );
962                    }
963                }
964            }
965            ReqState::ArgnLen => {
966                match read_bulk_len(input, &mut p, &mut token, &mut rlen, &mut rntokens) {
967                    BulkLenStep::More => {}
968                    BulkLenStep::Done => state = ReqState::ArgnLenLf,
969                    BulkLenStep::Error => {
970                        return finish_req_error(
971                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
972                            routing,
973                        );
974                    }
975                    BulkLenStep::Eof => break,
976                }
977            }
978            ReqState::ArgnLenLf => {
979                if ch == LF {
980                    state = ReqState::Argn;
981                    p += 1;
982                } else {
983                    return finish_req_error(
984                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
985                        routing,
986                    );
987                }
988            }
989            ReqState::Argn => match read_bulk_arg(input, p, rlen, record_args, r) {
990                ArgStep::More => {
991                    p = input.len();
992                    break;
993                }
994                ArgStep::Done(new_p) => {
995                    p = new_p;
996                    rlen = 0;
997                    state = ReqState::ArgnLf;
998                }
999                ArgStep::Error => {
1000                    return finish_req_error(
1001                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
1002                        routing,
1003                    );
1004                }
1005            },
1006            ReqState::ArgnLf => {
1007                if ch != LF {
1008                    return finish_req_error(
1009                        r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
1010                        routing,
1011                    );
1012                }
1013                p += 1;
1014                let class = classify(ty);
1015                match class {
1016                    CommandClass::ArgN | CommandClass::ArgEval => {
1017                        if rntokens == 0 {
1018                            return finish_req_ok(r, p, ty, is_read, quit, routing, ntokens, nkeys);
1019                        }
1020                        state = ReqState::ArgnLen;
1021                    }
1022                    _ => {
1023                        return finish_req_error(
1024                            r, state, p, token, rlen, rntokens, ntokens, nkeys, ty, is_read, quit,
1025                            routing,
1026                        );
1027                    }
1028                }
1029            }
1030        }
1031    }
1032
1033    // Reached end of input without completing.
1034    r.set_parser_state(state as u32);
1035    r.set_parser_pos(p);
1036    r.set_parser_token(token);
1037    r.set_rlen(rlen);
1038    r.set_rntokens(rntokens);
1039    r.set_ntokens(ntokens);
1040    r.set_nkeys(nkeys);
1041    if ty != MsgType::Unknown {
1042        r.set_type(ty);
1043    }
1044    r.flags_mut().is_read = is_read;
1045    r.flags_mut().quit = quit;
1046    r.flags_mut().rewrite_with_ts_possible = false;
1047    r.set_routing(routing);
1048    r.set_parse_result(MsgParseResult::Again);
1049    MsgParseResult::Again
1050}
1051
1052#[derive(Debug)]
1053enum BulkLenStep {
1054    More,
1055    Done,
1056    Error,
1057    Eof,
1058}
1059
1060fn read_bulk_len(
1061    input: &[u8],
1062    p: &mut usize,
1063    token: &mut Option<usize>,
1064    rlen: &mut u32,
1065    rntokens: &mut u32,
1066) -> BulkLenStep {
1067    if *p >= input.len() {
1068        return BulkLenStep::Eof;
1069    }
1070    let ch = input[*p];
1071    if token.is_none() {
1072        if ch != b'$' {
1073            return BulkLenStep::Error;
1074        }
1075        *token = Some(*p);
1076        *rlen = 0;
1077        *p += 1;
1078        return BulkLenStep::More;
1079    }
1080    if ch.is_ascii_digit() {
1081        *rlen = rlen.saturating_mul(10).saturating_add(u32::from(ch - b'0'));
1082        *p += 1;
1083        BulkLenStep::More
1084    } else if ch == CR {
1085        let start = token.expect("token recorded");
1086        if *p - start <= 1 || *rntokens == 0 {
1087            return BulkLenStep::Error;
1088        }
1089        *rntokens -= 1;
1090        *token = None;
1091        *p += 1;
1092        BulkLenStep::Done
1093    } else {
1094        BulkLenStep::Error
1095    }
1096}
1097
1098#[derive(Debug)]
1099enum ArgStep {
1100    /// Need more input bytes.
1101    More,
1102    /// Argument fully consumed; new cursor `p` (pointing at CR).
1103    Done(usize),
1104    /// Bad framing.
1105    Error,
1106}
1107
1108fn read_bulk_arg(input: &[u8], p: usize, rlen: u32, record: bool, r: &mut Msg) -> ArgStep {
1109    let needed = match p.checked_add(rlen as usize) {
1110        Some(n) => n,
1111        None => return ArgStep::Error,
1112    };
1113    if needed >= input.len() {
1114        return ArgStep::More;
1115    }
1116    if input[needed] != CR {
1117        return ArgStep::Error;
1118    }
1119    if record && rlen > 0 {
1120        r.push_arg(ArgPos::new(input[p..needed].to_vec()));
1121    }
1122    ArgStep::Done(needed + 1)
1123}
1124
1125#[allow(clippy::too_many_arguments)]
1126fn finish_req_ok(
1127    r: &mut Msg,
1128    pos: usize,
1129    ty: MsgType,
1130    is_read: bool,
1131    quit: bool,
1132    routing: MsgRouting,
1133    ntokens: u32,
1134    nkeys: u32,
1135) -> MsgParseResult {
1136    if ty == MsgType::Unknown {
1137        r.set_parse_result(MsgParseResult::Error);
1138        return MsgParseResult::Error;
1139    }
1140    r.set_type(ty);
1141    r.flags_mut().is_read = is_read;
1142    r.flags_mut().quit = quit;
1143    r.set_routing(routing);
1144    r.set_ntokens(ntokens);
1145    r.set_nkeys(nkeys);
1146    r.set_parser_state(0);
1147    r.set_parser_pos(pos);
1148    r.set_parser_token(None);
1149    r.set_parse_result(MsgParseResult::Ok);
1150    MsgParseResult::Ok
1151}
1152
1153#[allow(clippy::too_many_arguments)]
1154fn finish_req_error(
1155    r: &mut Msg,
1156    state: ReqState,
1157    pos: usize,
1158    token: Option<usize>,
1159    rlen: u32,
1160    rntokens: u32,
1161    ntokens: u32,
1162    nkeys: u32,
1163    ty: MsgType,
1164    is_read: bool,
1165    quit: bool,
1166    routing: MsgRouting,
1167) -> MsgParseResult {
1168    r.set_parser_state(state as u32);
1169    r.set_parser_pos(pos);
1170    r.set_parser_token(token);
1171    r.set_rlen(rlen);
1172    r.set_rntokens(rntokens);
1173    r.set_ntokens(ntokens);
1174    r.set_nkeys(nkeys);
1175    if ty != MsgType::Unknown {
1176        r.set_type(ty);
1177    }
1178    r.flags_mut().is_read = is_read;
1179    r.flags_mut().quit = quit;
1180    r.set_routing(routing);
1181    r.set_parse_result(MsgParseResult::Error);
1182    MsgParseResult::Error
1183}
1184
1185/// Parse a Redis response from `input` and update `r` in place.
1186///
1187/// On success the response type tag is set, the integer payload
1188/// (for `:n\r\n` responses) is recorded on the message, and the
1189/// parser cursor advances just past the trailing LF. On truncated
1190/// input the function returns [`MsgParseResult::Again`]. Invalid
1191/// bytes return [`MsgParseResult::Error`].
1192///
1193/// # Examples
1194///
1195/// ```
1196/// use dynomite::msg::{Msg, MsgParseResult, MsgType};
1197/// use dynomite::proto::redis::redis_parse_rsp;
1198///
1199/// let mut r = Msg::new(0, MsgType::Unknown, false);
1200/// let res = redis_parse_rsp(&mut r, b"+OK\r\n");
1201/// assert_eq!(res, MsgParseResult::Ok);
1202/// assert_eq!(r.ty(), MsgType::RspRedisStatus);
1203/// ```
1204#[allow(clippy::too_many_lines)]
1205pub fn redis_parse_rsp(r: &mut Msg, input: &[u8]) -> MsgParseResult {
1206    if r.is_request() {
1207        r.set_parse_result(MsgParseResult::Error);
1208        return MsgParseResult::Error;
1209    }
1210    let mut state = RspState::from_u32(r.parser_state());
1211    let mut p = r.parser_pos();
1212    let mut token: Option<usize> = r.parser_token();
1213    let mut rlen = r.rlen();
1214    let mut rntokens = r.rntokens();
1215    let mut ty = r.ty();
1216    let mut integer = r.integer();
1217    let mut int_negative = false;
1218    let mut ntoken_start: Option<usize> = r.ntoken_start();
1219    let mut ntoken_end: Option<usize> = r.ntoken_end();
1220
1221    while p < input.len() {
1222        let ch = input[p];
1223        match state {
1224            RspState::Start => {
1225                ty = MsgType::Unknown;
1226                match ch {
1227                    b'+' => {
1228                        ty = MsgType::RspRedisStatus;
1229                        state = RspState::RuntoCrlf;
1230                        p += 1;
1231                    }
1232                    b'-' => {
1233                        ty = MsgType::RspRedisError;
1234                        state = RspState::Error;
1235                    }
1236                    b':' => {
1237                        ty = MsgType::RspRedisInteger;
1238                        state = RspState::IntegerStart;
1239                        integer = 0;
1240                        int_negative = false;
1241                        p += 1;
1242                    }
1243                    b'$' => {
1244                        ty = MsgType::RspRedisBulk;
1245                        state = RspState::Bulk;
1246                    }
1247                    b'*' => {
1248                        ty = MsgType::RspRedisMultibulk;
1249                        state = RspState::Multibulk;
1250                    }
1251                    _ => {
1252                        return finish_rsp_error(
1253                            r,
1254                            state,
1255                            p,
1256                            token,
1257                            rlen,
1258                            rntokens,
1259                            ty,
1260                            integer,
1261                            ntoken_start,
1262                            ntoken_end,
1263                        );
1264                    }
1265                }
1266            }
1267            // Resume-from-state entry: the parser API allows callers
1268            // to resume from a saved RspState across input chunks.
1269            // Status and Integer entries are no-op transitions to
1270            // their respective body states (RuntoCrlf and
1271            // IntegerStart) so a resumed parse picks up where the
1272            // caller left off without re-consuming a byte.
1273            RspState::Status => {
1274                state = RspState::RuntoCrlf;
1275            }
1276            RspState::Error => {
1277                if token.is_none() {
1278                    if ch != b'-' {
1279                        return finish_rsp_error(
1280                            r,
1281                            state,
1282                            p,
1283                            token,
1284                            rlen,
1285                            rntokens,
1286                            ty,
1287                            integer,
1288                            ntoken_start,
1289                            ntoken_end,
1290                        );
1291                    }
1292                    token = Some(p);
1293                    p += 1;
1294                } else if ch == b' ' || ch == CR {
1295                    let start = token.expect("token recorded");
1296                    if let Some(t) = error_lookup(&input[start..p]) {
1297                        ty = t;
1298                    }
1299                    state = RspState::RuntoCrlf;
1300                    token = None;
1301                    // Do not advance.
1302                } else {
1303                    p += 1;
1304                }
1305            }
1306            RspState::Integer => {
1307                // Resume-from-state entry: see RspState::Status.
1308                state = RspState::IntegerStart;
1309                integer = 0;
1310                int_negative = false;
1311            }
1312            RspState::IntegerStart => {
1313                if ch == CR {
1314                    if int_negative {
1315                        integer = -integer;
1316                    }
1317                    state = RspState::AlmostDone;
1318                    p += 1;
1319                } else if ch == b'-' {
1320                    int_negative = true;
1321                    p += 1;
1322                } else if ch.is_ascii_digit() {
1323                    integer = integer
1324                        .saturating_mul(10)
1325                        .saturating_add(i64::from(ch - b'0'));
1326                    p += 1;
1327                } else {
1328                    return finish_rsp_error(
1329                        r,
1330                        state,
1331                        p,
1332                        token,
1333                        rlen,
1334                        rntokens,
1335                        ty,
1336                        integer,
1337                        ntoken_start,
1338                        ntoken_end,
1339                    );
1340                }
1341            }
1342            RspState::Simple => {
1343                if ch == CR {
1344                    state = RspState::MultibulkArgnLf;
1345                    rntokens = rntokens.saturating_sub(1);
1346                }
1347                p += 1;
1348            }
1349            RspState::RuntoCrlf => {
1350                if ch == CR {
1351                    state = RspState::AlmostDone;
1352                    p += 1;
1353                } else {
1354                    p += 1;
1355                }
1356            }
1357            RspState::AlmostDone => {
1358                if ch != LF {
1359                    return finish_rsp_error(
1360                        r,
1361                        state,
1362                        p,
1363                        token,
1364                        rlen,
1365                        rntokens,
1366                        ty,
1367                        integer,
1368                        ntoken_start,
1369                        ntoken_end,
1370                    );
1371                }
1372                p += 1;
1373                return finish_rsp_ok(r, p, ty, integer, ntoken_start, ntoken_end);
1374            }
1375            RspState::Bulk => {
1376                if token.is_none() {
1377                    if ch != b'$' {
1378                        return finish_rsp_error(
1379                            r,
1380                            state,
1381                            p,
1382                            token,
1383                            rlen,
1384                            rntokens,
1385                            ty,
1386                            integer,
1387                            ntoken_start,
1388                            ntoken_end,
1389                        );
1390                    }
1391                    token = Some(p);
1392                    rlen = 0;
1393                    p += 1;
1394                } else if ch == b'-' {
1395                    state = RspState::RuntoCrlf;
1396                    p += 1;
1397                } else if ch.is_ascii_digit() {
1398                    rlen = rlen.saturating_mul(10).saturating_add(u32::from(ch - b'0'));
1399                    p += 1;
1400                } else if ch == CR {
1401                    let start = token.expect("token recorded");
1402                    if p - start <= 1 {
1403                        return finish_rsp_error(
1404                            r,
1405                            state,
1406                            p,
1407                            token,
1408                            rlen,
1409                            rntokens,
1410                            ty,
1411                            integer,
1412                            ntoken_start,
1413                            ntoken_end,
1414                        );
1415                    }
1416                    token = None;
1417                    state = RspState::BulkLf;
1418                    p += 1;
1419                } else {
1420                    return finish_rsp_error(
1421                        r,
1422                        state,
1423                        p,
1424                        token,
1425                        rlen,
1426                        rntokens,
1427                        ty,
1428                        integer,
1429                        ntoken_start,
1430                        ntoken_end,
1431                    );
1432                }
1433            }
1434            RspState::BulkLf => {
1435                if ch == LF {
1436                    state = RspState::BulkArg;
1437                    p += 1;
1438                } else {
1439                    return finish_rsp_error(
1440                        r,
1441                        state,
1442                        p,
1443                        token,
1444                        rlen,
1445                        rntokens,
1446                        ty,
1447                        integer,
1448                        ntoken_start,
1449                        ntoken_end,
1450                    );
1451                }
1452            }
1453            RspState::BulkArg => {
1454                let needed = match p.checked_add(rlen as usize) {
1455                    Some(n) => n,
1456                    None => {
1457                        return finish_rsp_error(
1458                            r,
1459                            state,
1460                            p,
1461                            token,
1462                            rlen,
1463                            rntokens,
1464                            ty,
1465                            integer,
1466                            ntoken_start,
1467                            ntoken_end,
1468                        );
1469                    }
1470                };
1471                if needed >= input.len() {
1472                    p = input.len();
1473                    break;
1474                }
1475                if input[needed] != CR {
1476                    return finish_rsp_error(
1477                        r,
1478                        state,
1479                        p,
1480                        token,
1481                        rlen,
1482                        rntokens,
1483                        ty,
1484                        integer,
1485                        ntoken_start,
1486                        ntoken_end,
1487                    );
1488                }
1489                p = needed + 1;
1490                rlen = 0;
1491                state = RspState::BulkArgLf;
1492            }
1493            RspState::BulkArgLf => {
1494                if ch != CR && ch != LF {
1495                    return finish_rsp_error(
1496                        r,
1497                        state,
1498                        p,
1499                        token,
1500                        rlen,
1501                        rntokens,
1502                        ty,
1503                        integer,
1504                        ntoken_start,
1505                        ntoken_end,
1506                    );
1507                }
1508                if ch == CR {
1509                    p += 1;
1510                    continue;
1511                }
1512                p += 1;
1513                return finish_rsp_ok(r, p, ty, integer, ntoken_start, ntoken_end);
1514            }
1515            RspState::Multibulk => {
1516                if token.is_none() {
1517                    if ch != b'*' {
1518                        return finish_rsp_error(
1519                            r,
1520                            state,
1521                            p,
1522                            token,
1523                            rlen,
1524                            rntokens,
1525                            ty,
1526                            integer,
1527                            ntoken_start,
1528                            ntoken_end,
1529                        );
1530                    }
1531                    token = Some(p);
1532                    ntoken_start = Some(p);
1533                    rntokens = 0;
1534                    p += 1;
1535                } else if ch == b'-' {
1536                    state = RspState::RuntoCrlf;
1537                    p += 1;
1538                } else if ch.is_ascii_digit() {
1539                    rntokens = rntokens
1540                        .saturating_mul(10)
1541                        .saturating_add(u32::from(ch - b'0'));
1542                    p += 1;
1543                } else if ch == CR {
1544                    let start = token.expect("token recorded");
1545                    if p - start <= 1 {
1546                        return finish_rsp_error(
1547                            r,
1548                            state,
1549                            p,
1550                            token,
1551                            rlen,
1552                            rntokens,
1553                            ty,
1554                            integer,
1555                            ntoken_start,
1556                            ntoken_end,
1557                        );
1558                    }
1559                    ntoken_end = Some(p);
1560                    token = None;
1561                    state = RspState::MultibulkNargLf;
1562                    p += 1;
1563                } else {
1564                    return finish_rsp_error(
1565                        r,
1566                        state,
1567                        p,
1568                        token,
1569                        rlen,
1570                        rntokens,
1571                        ty,
1572                        integer,
1573                        ntoken_start,
1574                        ntoken_end,
1575                    );
1576                }
1577            }
1578            RspState::MultibulkNargLf => {
1579                if ch != LF {
1580                    return finish_rsp_error(
1581                        r,
1582                        state,
1583                        p,
1584                        token,
1585                        rlen,
1586                        rntokens,
1587                        ty,
1588                        integer,
1589                        ntoken_start,
1590                        ntoken_end,
1591                    );
1592                }
1593                p += 1;
1594                if rntokens == 0 {
1595                    return finish_rsp_ok(r, p, ty, integer, ntoken_start, ntoken_end);
1596                }
1597                state = RspState::MultibulkArgnLen;
1598            }
1599            RspState::MultibulkArgnLen => {
1600                if token.is_none() {
1601                    if ch == b'*' {
1602                        state = RspState::Multibulk;
1603                        // Do not advance.
1604                        continue;
1605                    }
1606                    if ch == b':' || ch == b'+' || ch == b'-' {
1607                        state = RspState::Simple;
1608                        // Do not advance.
1609                        continue;
1610                    }
1611                    if ch != b'$' {
1612                        return finish_rsp_error(
1613                            r,
1614                            state,
1615                            p,
1616                            token,
1617                            rlen,
1618                            rntokens,
1619                            ty,
1620                            integer,
1621                            ntoken_start,
1622                            ntoken_end,
1623                        );
1624                    }
1625                    token = Some(p);
1626                    rlen = 0;
1627                    p += 1;
1628                } else if ch.is_ascii_digit() {
1629                    rlen = rlen.saturating_mul(10).saturating_add(u32::from(ch - b'0'));
1630                    p += 1;
1631                } else if ch == b'-' {
1632                    p += 1;
1633                } else if ch == CR {
1634                    let start = token.expect("token recorded");
1635                    if p - start <= 1 || rntokens == 0 {
1636                        return finish_rsp_error(
1637                            r,
1638                            state,
1639                            p,
1640                            token,
1641                            rlen,
1642                            rntokens,
1643                            ty,
1644                            integer,
1645                            ntoken_start,
1646                            ntoken_end,
1647                        );
1648                    }
1649                    if rlen == 1 && p - start == 3 {
1650                        rlen = 0;
1651                        state = RspState::MultibulkArgnLf;
1652                    } else {
1653                        state = RspState::MultibulkArgnLenLf;
1654                    }
1655                    rntokens = rntokens.saturating_sub(1);
1656                    token = None;
1657                    p += 1;
1658                } else {
1659                    return finish_rsp_error(
1660                        r,
1661                        state,
1662                        p,
1663                        token,
1664                        rlen,
1665                        rntokens,
1666                        ty,
1667                        integer,
1668                        ntoken_start,
1669                        ntoken_end,
1670                    );
1671                }
1672            }
1673            RspState::MultibulkArgnLenLf => {
1674                if ch == LF {
1675                    state = RspState::MultibulkArgn;
1676                    p += 1;
1677                } else {
1678                    return finish_rsp_error(
1679                        r,
1680                        state,
1681                        p,
1682                        token,
1683                        rlen,
1684                        rntokens,
1685                        ty,
1686                        integer,
1687                        ntoken_start,
1688                        ntoken_end,
1689                    );
1690                }
1691            }
1692            RspState::MultibulkArgn => {
1693                let needed = match p.checked_add(rlen as usize) {
1694                    Some(n) => n,
1695                    None => {
1696                        return finish_rsp_error(
1697                            r,
1698                            state,
1699                            p,
1700                            token,
1701                            rlen,
1702                            rntokens,
1703                            ty,
1704                            integer,
1705                            ntoken_start,
1706                            ntoken_end,
1707                        );
1708                    }
1709                };
1710                if needed >= input.len() {
1711                    p = input.len();
1712                    break;
1713                }
1714                if input[needed] != CR {
1715                    return finish_rsp_error(
1716                        r,
1717                        state,
1718                        p,
1719                        token,
1720                        rlen,
1721                        rntokens,
1722                        ty,
1723                        integer,
1724                        ntoken_start,
1725                        ntoken_end,
1726                    );
1727                }
1728                if rlen > 0 {
1729                    r.push_arg(ArgPos::new(input[p..needed].to_vec()));
1730                }
1731                p = needed + 1;
1732                rlen = 0;
1733                state = RspState::MultibulkArgnLf;
1734            }
1735            RspState::MultibulkArgnLf => {
1736                if ch != LF {
1737                    return finish_rsp_error(
1738                        r,
1739                        state,
1740                        p,
1741                        token,
1742                        rlen,
1743                        rntokens,
1744                        ty,
1745                        integer,
1746                        ntoken_start,
1747                        ntoken_end,
1748                    );
1749                }
1750                p += 1;
1751                if rntokens == 0 {
1752                    return finish_rsp_ok(r, p, ty, integer, ntoken_start, ntoken_end);
1753                }
1754                state = RspState::MultibulkArgnLen;
1755            }
1756        }
1757    }
1758
1759    r.set_parser_state(state as u32);
1760    r.set_parser_pos(p);
1761    r.set_parser_token(token);
1762    r.set_rlen(rlen);
1763    r.set_rntokens(rntokens);
1764    r.set_integer(integer);
1765    r.set_ntoken_span(ntoken_start, ntoken_end);
1766    if ty != MsgType::Unknown {
1767        r.set_type(ty);
1768    }
1769    r.flags_mut().is_error = super::commands::is_redis_error(ty);
1770    r.set_parse_result(MsgParseResult::Again);
1771    MsgParseResult::Again
1772}
1773
1774fn finish_rsp_ok(
1775    r: &mut Msg,
1776    pos: usize,
1777    ty: MsgType,
1778    integer: i64,
1779    ntoken_start: Option<usize>,
1780    ntoken_end: Option<usize>,
1781) -> MsgParseResult {
1782    if ty == MsgType::Unknown {
1783        r.set_parse_result(MsgParseResult::Error);
1784        return MsgParseResult::Error;
1785    }
1786    r.set_type(ty);
1787    r.set_integer(integer);
1788    r.set_ntoken_span(ntoken_start, ntoken_end);
1789    r.set_parser_state(0);
1790    r.set_parser_pos(pos);
1791    r.set_parser_token(None);
1792    r.flags_mut().is_error = super::commands::is_redis_error(ty);
1793    r.set_parse_result(MsgParseResult::Ok);
1794    MsgParseResult::Ok
1795}
1796
1797#[allow(clippy::too_many_arguments)]
1798fn finish_rsp_error(
1799    r: &mut Msg,
1800    state: RspState,
1801    pos: usize,
1802    token: Option<usize>,
1803    rlen: u32,
1804    rntokens: u32,
1805    ty: MsgType,
1806    integer: i64,
1807    ntoken_start: Option<usize>,
1808    ntoken_end: Option<usize>,
1809) -> MsgParseResult {
1810    r.set_parser_state(state as u32);
1811    r.set_parser_pos(pos);
1812    r.set_parser_token(token);
1813    r.set_rlen(rlen);
1814    r.set_rntokens(rntokens);
1815    r.set_integer(integer);
1816    r.set_ntoken_span(ntoken_start, ntoken_end);
1817    if ty != MsgType::Unknown {
1818        r.set_type(ty);
1819    }
1820    r.set_parse_result(MsgParseResult::Error);
1821    MsgParseResult::Error
1822}
1823
1824#[cfg(test)]
1825mod tests {
1826    use super::*;
1827
1828    fn parse_req(input: &[u8]) -> Msg {
1829        let mut m = Msg::new(0, MsgType::Unknown, true);
1830        let _ = redis_parse_req(&mut m, input);
1831        m
1832    }
1833
1834    fn parse_rsp(input: &[u8]) -> Msg {
1835        let mut m = Msg::new(0, MsgType::Unknown, false);
1836        let _ = redis_parse_rsp(&mut m, input);
1837        m
1838    }
1839
1840    #[test]
1841    fn parse_get() {
1842        let m = parse_req(b"*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n");
1843        assert_eq!(m.parse_result(), MsgParseResult::Ok);
1844        assert_eq!(m.ty(), MsgType::ReqRedisGet);
1845        assert_eq!(m.keys()[0].key(), b"foo");
1846    }
1847
1848    #[test]
1849    fn parse_set() {
1850        let m = parse_req(b"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n");
1851        assert_eq!(m.parse_result(), MsgParseResult::Ok);
1852        assert_eq!(m.ty(), MsgType::ReqRedisSet);
1853        assert_eq!(m.keys()[0].key(), b"foo");
1854        assert_eq!(m.args()[0].bytes(), b"bar");
1855    }
1856
1857    #[test]
1858    fn parse_del_multikey() {
1859        let m = parse_req(b"*4\r\n$3\r\nDEL\r\n$1\r\na\r\n$1\r\nb\r\n$1\r\nc\r\n");
1860        assert_eq!(m.parse_result(), MsgParseResult::Ok);
1861        assert_eq!(m.ty(), MsgType::ReqRedisDel);
1862        let keys: Vec<&[u8]> = m.keys().iter().map(crate::msg::KeyPos::key).collect();
1863        assert_eq!(keys, vec![&b"a"[..], b"b", b"c"]);
1864    }
1865
1866    #[test]
1867    fn parse_mset() {
1868        let m = parse_req(b"*5\r\n$4\r\nMSET\r\n$1\r\na\r\n$1\r\n1\r\n$1\r\nb\r\n$1\r\n2\r\n");
1869        assert_eq!(m.parse_result(), MsgParseResult::Ok);
1870        assert_eq!(m.ty(), MsgType::ReqRedisMset);
1871        let keys: Vec<&[u8]> = m.keys().iter().map(crate::msg::KeyPos::key).collect();
1872        assert_eq!(keys, vec![&b"a"[..], b"b"]);
1873    }
1874
1875    #[test]
1876    fn parse_ping_inline() {
1877        let m = parse_req(b"PING\r\n");
1878        assert_eq!(m.parse_result(), MsgParseResult::Ok);
1879        assert_eq!(m.ty(), MsgType::ReqRedisPing);
1880    }
1881
1882    #[test]
1883    fn parse_ping_resp() {
1884        let m = parse_req(b"*1\r\n$4\r\nPING\r\n");
1885        assert_eq!(m.parse_result(), MsgParseResult::Ok);
1886        assert_eq!(m.ty(), MsgType::ReqRedisPing);
1887    }
1888
1889    #[test]
1890    fn parse_unknown_command_errors() {
1891        let m = parse_req(b"*1\r\n$3\r\nFOO\r\n");
1892        assert_eq!(m.parse_result(), MsgParseResult::Error);
1893    }
1894
1895    #[test]
1896    fn parse_truncated_returns_again() {
1897        let m = parse_req(b"*2\r\n$3\r\nGET\r\n$3\r\nfo");
1898        assert_eq!(m.parse_result(), MsgParseResult::Again);
1899    }
1900
1901    #[test]
1902    fn parse_status_response() {
1903        let m = parse_rsp(b"+OK\r\n");
1904        assert_eq!(m.parse_result(), MsgParseResult::Ok);
1905        assert_eq!(m.ty(), MsgType::RspRedisStatus);
1906    }
1907
1908    #[test]
1909    fn parse_error_response_classifies() {
1910        let m = parse_rsp(b"-WRONGTYPE Operation not allowed\r\n");
1911        assert_eq!(m.parse_result(), MsgParseResult::Ok);
1912        assert_eq!(m.ty(), MsgType::RspRedisErrorWrongtype);
1913        assert!(m.flags().is_error);
1914    }
1915
1916    #[test]
1917    fn parse_integer_response() {
1918        let m = parse_rsp(b":42\r\n");
1919        assert_eq!(m.parse_result(), MsgParseResult::Ok);
1920        assert_eq!(m.ty(), MsgType::RspRedisInteger);
1921        assert_eq!(m.integer(), 42);
1922    }
1923
1924    #[test]
1925    fn parse_bulk_response() {
1926        let m = parse_rsp(b"$5\r\nhello\r\n");
1927        assert_eq!(m.parse_result(), MsgParseResult::Ok);
1928        assert_eq!(m.ty(), MsgType::RspRedisBulk);
1929    }
1930
1931    #[test]
1932    fn parse_null_bulk_response() {
1933        let m = parse_rsp(b"$-1\r\n");
1934        assert_eq!(m.parse_result(), MsgParseResult::Ok);
1935        assert_eq!(m.ty(), MsgType::RspRedisBulk);
1936    }
1937
1938    #[test]
1939    fn parse_empty_multibulk_response() {
1940        let m = parse_rsp(b"*0\r\n");
1941        assert_eq!(m.parse_result(), MsgParseResult::Ok);
1942        assert_eq!(m.ty(), MsgType::RspRedisMultibulk);
1943    }
1944}