Skip to main content

kevy_resp/
request.rs

1//! Request-side parser: turns a byte stream from a client into an [`Argv`].
2//!
3//! Handles the two RESP2 request forms — `*N\r\n$L\r\n…` multi-bulk (the
4//! normal client encoding) and the inline form (whitespace-separated, a
5//! convenience for raw-typed PING / DEBUG / etc). Parsing is incremental:
6//! returning `Ok(None)` asks the caller to read more bytes and retry.
7
8use crate::argv::{Argv, Command};
9use crate::error::ProtocolError;
10
11/// Attempt to parse one command from the front of `buf`.
12///
13/// - `Ok(Some((cmd, consumed)))` — a full command; `consumed` bytes may be dropped.
14/// - `Ok(None)` — need more bytes; call again after reading more.
15/// - `Err(_)` — the stream is corrupt; the caller should reply with an error
16///   and close the connection.
17///
18/// This is the convenience form that allocates a fresh `Argv` per call. The
19/// reactor's hot path uses [`parse_command_into`] with a reused scratch
20/// `Argv` to keep per-cmd malloc rate at 0.
21pub fn parse_command(buf: &[u8]) -> Result<Option<(Command, usize)>, ProtocolError> {
22    let mut argv = Argv::default();
23    match parse_command_into(buf, &mut argv)? {
24        Some(consumed) => Ok(Some((argv, consumed))),
25        None => Ok(None),
26    }
27}
28
29/// Same as [`parse_command`], but writes into a caller-provided scratch
30/// `Argv` instead of allocating a new one each call. The reactor stores one
31/// `Argv` per shard and reuses it for every cmd on the local hot path; the
32/// internal `Vec<u8>` + `Vec<u32>` capacities amortise to zero allocations
33/// per command after the first few cmds warm them.
34///
35/// `dst` is cleared at the start of every call; on `Ok(None)` and `Err`, `dst`
36/// is left empty (so the caller doesn't see partial state).
37pub fn parse_command_into(buf: &[u8], dst: &mut Argv) -> Result<Option<usize>, ProtocolError> {
38    dst.clear();
39    if buf.is_empty() {
40        return Ok(None);
41    }
42    if buf[0] == b'*' {
43        parse_multibulk_into(buf, dst)
44    } else {
45        parse_inline_into(buf, dst)
46    }
47}
48
49// Signature mirrors `parse_multibulk_into` so `parse_command_into` can dispatch
50// on the leading byte without converting between Result and Option shapes —
51// the inline path can't actually fail, but the Result wrap is a price we pay
52// for arm symmetry, not a hidden error path.
53#[allow(clippy::unnecessary_wraps)]
54fn parse_inline_into(buf: &[u8], dst: &mut Argv) -> Result<Option<usize>, ProtocolError> {
55    let Some(eol) = find_crlf(buf, 0) else {
56        return Ok(None);
57    };
58    let line = &buf[..eol];
59    for tok in line
60        .split(u8::is_ascii_whitespace)
61        .filter(|s| !s.is_empty())
62    {
63        dst.push(tok);
64    }
65    Ok(Some(eol + 2))
66}
67
68/// Validate the multi-bulk frame is fully present and report `(end_pos,
69/// total_arg_bytes)` if so. `start_pos` is the offset of the first `$`
70/// after the `*N\r\n` header. `Ok(None)` = need more bytes; `Err` = malformed.
71pub(crate) fn validate_multibulk_frame(
72    buf: &[u8],
73    start_pos: usize,
74    count: usize,
75) -> Result<Option<(usize, usize)>, ProtocolError> {
76    let mut pos = start_pos;
77    let mut total = 0usize;
78    for _ in 0..count {
79        if pos >= buf.len() {
80            return Ok(None);
81        }
82        if buf[pos] != b'$' {
83            return Err(ProtocolError::Malformed("expected bulk string"));
84        }
85        let Some(len_end) = find_crlf(buf, pos + 1) else {
86            return Ok(None);
87        };
88        let len = parse_int(&buf[pos + 1..len_end])
89            .ok_or(ProtocolError::Malformed("bad bulk length"))?;
90        if len < 0 {
91            return Err(ProtocolError::Malformed("negative bulk length in request"));
92        }
93        let len = len as usize;
94        let data_end = len_end + 2 + len;
95        if buf.len() < data_end + 2 {
96            return Ok(None);
97        }
98        if &buf[data_end..data_end + 2] != b"\r\n" {
99            return Err(ProtocolError::Malformed("bulk string not CRLF-terminated"));
100        }
101        total += len;
102        pos = data_end + 2;
103    }
104    Ok(Some((pos, total)))
105}
106
107/// Copy `count` already-validated bulk args from `buf[start_pos..]` into `dst`.
108/// Caller must have called [`validate_multibulk_frame`] first.
109fn copy_multibulk_args(buf: &[u8], start_pos: usize, count: usize, dst: &mut Argv) {
110    let mut p = start_pos;
111    for _ in 0..count {
112        let len_end = find_crlf(buf, p + 1).expect("validated in pass 1");
113        let len = parse_int(&buf[p + 1..len_end]).expect("validated in pass 1") as usize;
114        let data_start = len_end + 2;
115        dst.push(&buf[data_start..data_start + len]);
116        p = data_start + len + 2;
117    }
118}
119
120fn parse_multibulk_into(buf: &[u8], dst: &mut Argv) -> Result<Option<usize>, ProtocolError> {
121    let Some(hdr_end) = find_crlf(buf, 1) else {
122        return Ok(None);
123    };
124    let count =
125        parse_int(&buf[1..hdr_end]).ok_or(ProtocolError::Malformed("bad multibulk count"))?;
126    if count < 0 {
127        // Null array → empty argv (already cleared).
128        return Ok(Some(hdr_end + 2));
129    }
130    let count = count as usize;
131    let start = hdr_end + 2;
132
133    let Some((end_pos, total)) = validate_multibulk_frame(buf, start, count)? else {
134        return Ok(None);
135    };
136
137    // `reserve` is a no-op when the scratch Argv has already amortised
138    // enough capacity from earlier cmds.
139    dst.reserve_for(count, total);
140    copy_multibulk_args(buf, start, count, dst);
141    Ok(Some(end_pos))
142}
143
144/// Parse a bulk-string length header `$<len>\r\n` whose `$` sits at
145/// `buf[pos]` (the caller has already checked that byte). One fused pass:
146/// the digits accumulate while the same loop walks to the terminating
147/// CRLF — bulk headers are 2-21 bytes, so this short byte loop beats the
148/// `find_crlf` + [`parse_int`] double scan the two-pass parser paid per
149/// arg. Accepts the same shapes as `parse_int` (optional `+`/`-` sign,
150/// checked i64 accumulation); a negative length is malformed in a
151/// request, matching [`validate_multibulk_frame`].
152///
153/// Returns `(len, data_start)`; `Ok(None)` = need more bytes.
154#[inline]
155pub(crate) fn parse_bulk_len(
156    buf: &[u8],
157    pos: usize,
158) -> Result<Option<(usize, usize)>, ProtocolError> {
159    let mut q = pos + 1;
160    let neg = match buf.get(q) {
161        None => return Ok(None),
162        Some(b'-') => {
163            q += 1;
164            true
165        }
166        Some(b'+') => {
167            q += 1;
168            false
169        }
170        _ => false,
171    };
172    let digits_start = q;
173    let mut acc: i64 = 0;
174    loop {
175        match buf.get(q) {
176            None => return Ok(None),
177            Some(&b) if b.is_ascii_digit() => {
178                acc = acc
179                    .checked_mul(10)
180                    .and_then(|a| a.checked_add(i64::from(b - b'0')))
181                    .ok_or(ProtocolError::Malformed("bad bulk length"))?;
182                q += 1;
183            }
184            Some(b'\r') => break,
185            Some(_) => return Err(ProtocolError::Malformed("bad bulk length")),
186        }
187    }
188    if q == digits_start {
189        return Err(ProtocolError::Malformed("bad bulk length"));
190    }
191    match buf.get(q + 1) {
192        None => return Ok(None),
193        Some(b'\n') => {}
194        Some(_) => return Err(ProtocolError::Malformed("bad bulk length")),
195    }
196    if neg {
197        return Err(ProtocolError::Malformed("negative bulk length in request"));
198    }
199    Ok(Some((acc as usize, q + 2)))
200}
201
202/// Find the index of `\r\n` at or after `start`, returning the index of `\r`.
203///
204/// A6 + A7 (2026-06-20): delegates to `kevy_bytes::find_crlf`, which picks
205/// AVX2 (x86_64 runtime-detected) / NEON (aarch64 baseline) / u64 SWAR
206/// fallback. The previous in-crate SWAR loop is now the fallback tier
207/// of that dispatch. Pulling the SIMD path into kevy-bytes keeps this
208/// crate under #![forbid(unsafe_code)] — kevy-bytes already wraps
209/// SmallBytes' unsafe union work so it's the right home for arch
210/// intrinsics.
211#[inline]
212pub(crate) fn find_crlf(buf: &[u8], start: usize) -> Option<usize> {
213    kevy_bytes::find_crlf(buf, start)
214}
215
216/// Parse a base-10 signed integer from ASCII bytes (no surrounding whitespace).
217#[inline]
218pub(crate) fn parse_int(bytes: &[u8]) -> Option<i64> {
219    if bytes.is_empty() {
220        return None;
221    }
222    let (neg, digits) = match bytes[0] {
223        b'-' => (true, &bytes[1..]),
224        b'+' => (false, &bytes[1..]),
225        _ => (false, bytes),
226    };
227    if digits.is_empty() {
228        return None;
229    }
230    let mut acc: i64 = 0;
231    for &b in digits {
232        if !b.is_ascii_digit() {
233            return None;
234        }
235        acc = acc.checked_mul(10)?.checked_add(i64::from(b - b'0'))?;
236    }
237    Some(if neg { -acc } else { acc })
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use crate::encode_command;
244
245    // SWAR find_crlf fuzz: planted CRLFs at every offset 0..40, lone-CR
246    // distractors, no-CRLF inputs, near-end boundaries. The SWAR window is
247    // 8 bytes, so transitions at offsets 0/7/8/15/16/… stress alignment.
248    #[test]
249    fn find_crlf_at_every_offset() {
250        for off in 0..40 {
251            let mut buf = vec![b'a'; 60];
252            buf[off] = b'\r';
253            buf[off + 1] = b'\n';
254            assert_eq!(find_crlf(&buf, 0), Some(off), "off={off}");
255        }
256    }
257
258    #[test]
259    fn find_crlf_skips_lone_cr() {
260        // Lone \r at the front, then a real CRLF later.
261        let mut buf = vec![b'a'; 32];
262        buf[3] = b'\r';
263        buf[4] = b'b'; // not \n → skip
264        buf[20] = b'\r';
265        buf[21] = b'\n';
266        assert_eq!(find_crlf(&buf, 0), Some(20));
267    }
268
269    #[test]
270    fn find_crlf_none_when_absent() {
271        let buf = vec![b'a'; 32];
272        assert_eq!(find_crlf(&buf, 0), None);
273        let buf = b"";
274        assert_eq!(find_crlf(buf, 0), None);
275        let buf = b"\r"; // only CR, no LF available
276        assert_eq!(find_crlf(buf, 0), None);
277    }
278
279    #[test]
280    fn find_crlf_at_buffer_end() {
281        let buf = b"abcdefghij\r\n"; // CRLF at offset 10
282        assert_eq!(find_crlf(buf, 0), Some(10));
283        // Start past the CR.
284        assert_eq!(find_crlf(buf, 11), None);
285    }
286
287    #[test]
288    fn find_crlf_with_many_lone_crs() {
289        // 7 lone CRs followed by a real CRLF. SWAR finds one CR per iter
290        // but must keep going until it finds the real pair.
291        let mut buf = Vec::new();
292        for _ in 0..7 {
293            buf.push(b'\r');
294            buf.push(b'x'); // not \n
295        }
296        buf.extend_from_slice(b"\r\n");
297        // Real CRLF starts at offset 14 (7 * 2).
298        assert_eq!(find_crlf(&buf, 0), Some(14));
299    }
300
301    #[test]
302    fn find_crlf_from_nonzero_start() {
303        let buf = b"\r\n\r\n\r\n";
304        // Starts at offset 0 → first CRLF.
305        assert_eq!(find_crlf(buf, 0), Some(0));
306        // Skip the first CRLF.
307        assert_eq!(find_crlf(buf, 2), Some(2));
308        assert_eq!(find_crlf(buf, 4), Some(4));
309    }
310
311    #[test]
312    fn parse_multibulk_ping() {
313        let (cmd, used) = parse_command(b"*1\r\n$4\r\nPING\r\n").unwrap().unwrap();
314        assert_eq!(cmd, vec![b"PING".to_vec()]);
315        assert_eq!(used, 14);
316    }
317
318    #[test]
319    fn parse_multibulk_echo() {
320        let frame = b"*2\r\n$4\r\nECHO\r\n$5\r\nhello\r\n";
321        let (cmd, used) = parse_command(frame).unwrap().unwrap();
322        assert_eq!(cmd, vec![b"ECHO".to_vec(), b"hello".to_vec()]);
323        assert_eq!(used, frame.len());
324    }
325
326    #[test]
327    fn parse_incomplete_returns_none() {
328        assert_eq!(parse_command(b"*1\r\n$4\r\nPI").unwrap(), None);
329        assert_eq!(parse_command(b"*2\r\n$4\r\nECHO\r\n").unwrap(), None);
330        assert_eq!(parse_command(b"").unwrap(), None);
331    }
332
333    #[test]
334    fn parse_inline_command() {
335        let (cmd, used) = parse_command(b"PING\r\n").unwrap().unwrap();
336        assert_eq!(cmd, vec![b"PING".to_vec()]);
337        assert_eq!(used, 6);
338        let (cmd, _) = parse_command(b"ECHO  hi there\r\n").unwrap().unwrap();
339        assert_eq!(
340            cmd,
341            vec![b"ECHO".to_vec(), b"hi".to_vec(), b"there".to_vec()]
342        );
343    }
344
345    #[test]
346    fn parse_malformed_errors() {
347        assert!(parse_command(b"*1\r\n+OK\r\n").is_err());
348        assert!(parse_command(b"*x\r\n").is_err());
349    }
350
351    #[test]
352    fn round_trip_command() {
353        let mut buf = Vec::new();
354        encode_command(&mut buf, &[b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]);
355        let (cmd, used) = parse_command(&buf).unwrap().unwrap();
356        assert_eq!(cmd, vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]);
357        assert_eq!(used, buf.len());
358    }
359
360}