Skip to main content

kevy_resp/
request_borrowed.rs

1//! Borrowed-buffer request parser: zero-copy alternative to [`parse_command`].
2//!
3//! Records each arg as a `(start, end)` range into the caller's input buffer
4//! rather than copying its bytes. Pairs with [`crate::ArgvBorrowed`].
5
6use crate::argv_borrowed::ArgvBorrowed;
7use crate::error::ProtocolError;
8use crate::request::{find_crlf, parse_bulk_len, parse_int};
9
10/// Parse one command from the front of `buf`, recording each arg as a
11/// `(start, end)` range into `buf` rather than copying its bytes.
12///
13/// The returned [`ArgvBorrowed`] is a zero-copy view: `get(i)` slices directly
14/// into `buf`. Use this on the local single-shard hot path; call
15/// [`ArgvBorrowed::into_owned`] before storing an argv past the buffer's
16/// lifetime (cross-shard dispatch, MULTI queue, AOF logging).
17///
18/// Return shape matches [`crate::parse_command`]: `Ok(Some((argv, consumed)))`,
19/// `Ok(None)` if more bytes are needed, `Err` on malformed input.
20pub fn parse_command_borrowed(
21    buf: &[u8],
22) -> Result<Option<(ArgvBorrowed<'_>, usize)>, ProtocolError> {
23    if buf.is_empty() {
24        return Ok(None);
25    }
26    if buf[0] == b'*' {
27        parse_multibulk_borrowed(buf)
28    } else {
29        parse_inline_borrowed(buf)
30    }
31}
32
33fn parse_inline_borrowed(
34    buf: &[u8],
35) -> Result<Option<(ArgvBorrowed<'_>, usize)>, ProtocolError> {
36    let Some(eol) = find_crlf(buf, 0) else {
37        return Ok(None);
38    };
39    let mut argv = ArgvBorrowed::new(buf);
40    let line = &buf[..eol];
41    let mut i = 0;
42    while i < line.len() {
43        if line[i].is_ascii_whitespace() {
44            i += 1;
45            continue;
46        }
47        let start = i;
48        while i < line.len() && !line[i].is_ascii_whitespace() {
49            i += 1;
50        }
51        argv.push_range(start, i);
52    }
53    Ok(Some((argv, eol + 2)))
54}
55
56/// Single-pass multibulk parse: each arg's header is validated and its
57/// `(start, end)` range recorded in the same walk. The old two-pass shape
58/// (`validate_multibulk_frame`, then re-scan every header to record the
59/// ranges) re-paid `find_crlf` + `parse_int` per arg — measurably ~half
60/// the whole parse cost at the 8-shard bench corner. On `Ok(None)` /
61/// `Err` the partially-built argv is simply dropped (a range vec; no
62/// bytes were copied).
63fn parse_multibulk_borrowed(
64    buf: &[u8],
65) -> Result<Option<(ArgvBorrowed<'_>, usize)>, ProtocolError> {
66    let Some(hdr_end) = find_crlf(buf, 1) else {
67        return Ok(None);
68    };
69    let count =
70        parse_int(&buf[1..hdr_end]).ok_or(ProtocolError::Malformed("bad multibulk count"))?;
71    if count < 0 {
72        return Ok(Some((ArgvBorrowed::new(buf), hdr_end + 2)));
73    }
74    let count = count as usize;
75
76    let mut argv = ArgvBorrowed::with_capacity(buf, count);
77    let mut p = hdr_end + 2;
78    for _ in 0..count {
79        match buf.get(p) {
80            None => return Ok(None),
81            Some(b'$') => {}
82            Some(_) => return Err(ProtocolError::Malformed("expected bulk string")),
83        }
84        let Some((len, data_start)) = parse_bulk_len(buf, p)? else {
85            return Ok(None);
86        };
87        let data_end = data_start + len;
88        if buf.len() < data_end + 2 {
89            return Ok(None);
90        }
91        if &buf[data_end..data_end + 2] != b"\r\n" {
92            return Err(ProtocolError::Malformed("bulk string not CRLF-terminated"));
93        }
94        argv.push_range(data_start, data_end);
95        p = data_end + 2;
96    }
97    Ok(Some((argv, p)))
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use crate::{encode_command, parse_command};
104
105    #[test]
106    fn borrowed_multibulk_ping() {
107        let frame = b"*1\r\n$4\r\nPING\r\n";
108        let (argv, used) = parse_command_borrowed(frame).unwrap().unwrap();
109        assert_eq!(argv.len(), 1);
110        assert_eq!(argv.first(), Some(b"PING" as &[u8]));
111        assert_eq!(used, frame.len());
112        // The arg slice points back into the original buffer (zero copy).
113        assert_eq!(argv.get(0).unwrap().as_ptr(), frame[8..].as_ptr());
114    }
115
116    #[test]
117    fn borrowed_multibulk_echo_zero_copy() {
118        let frame = b"*2\r\n$4\r\nECHO\r\n$5\r\nhello\r\n";
119        let (argv, used) = parse_command_borrowed(frame).unwrap().unwrap();
120        assert_eq!(argv, vec![b"ECHO".to_vec(), b"hello".to_vec()]);
121        assert_eq!(used, frame.len());
122        // Each arg slice's pointer falls inside the input buffer — proves
123        // the borrowed parser never copied the bytes elsewhere.
124        let base = frame.as_ptr() as usize;
125        let end = base + frame.len();
126        for i in 0..argv.len() {
127            let slice = argv.get(i).unwrap();
128            let p = slice.as_ptr() as usize;
129            assert!(
130                p >= base && p + slice.len() <= end,
131                "arg {i} not borrowed from buf"
132            );
133        }
134    }
135
136    #[test]
137    fn borrowed_incomplete_returns_none() {
138        assert!(parse_command_borrowed(b"*1\r\n$4\r\nPI").unwrap().is_none());
139        assert!(
140            parse_command_borrowed(b"*2\r\n$4\r\nECHO\r\n")
141                .unwrap()
142                .is_none()
143        );
144        assert!(parse_command_borrowed(b"").unwrap().is_none());
145    }
146
147    #[test]
148    fn borrowed_inline_command() {
149        let frame = b"PING\r\n";
150        let (argv, used) = parse_command_borrowed(frame).unwrap().unwrap();
151        assert_eq!(argv, vec![b"PING".to_vec()]);
152        assert_eq!(used, frame.len());
153
154        let frame = b"ECHO  hi there\r\n";
155        let (argv, _) = parse_command_borrowed(frame).unwrap().unwrap();
156        assert_eq!(
157            argv,
158            vec![b"ECHO".to_vec(), b"hi".to_vec(), b"there".to_vec()]
159        );
160    }
161
162    #[test]
163    fn borrowed_malformed_errors() {
164        assert!(parse_command_borrowed(b"*1\r\n+OK\r\n").is_err());
165        assert!(parse_command_borrowed(b"*x\r\n").is_err());
166        // Bulk-header malformations through the fused single-pass walk.
167        assert!(parse_command_borrowed(b"*1\r\n$x\r\n").is_err());
168        assert!(parse_command_borrowed(b"*1\r\n$\r\n").is_err());
169        assert!(parse_command_borrowed(b"*1\r\n$-1\r\n").is_err());
170        assert!(parse_command_borrowed(b"*1\r\n$3\rXabc\r\n").is_err());
171        // 20 nines overflows i64 → malformed, not a hang.
172        assert!(parse_command_borrowed(b"*1\r\n$99999999999999999999\r\n").is_err());
173        // Bulk data not CRLF-terminated.
174        assert!(parse_command_borrowed(b"*1\r\n$3\r\nabcXX").is_err());
175    }
176
177    #[test]
178    fn borrowed_incomplete_at_every_prefix_returns_none() {
179        // The single-pass parser must report Ok(None) — never Err, never
180        // Some — for every strict prefix of a valid frame (a split TCP
181        // read can land at any byte).
182        let frame = b"*3\r\n$3\r\nSET\r\n$16\r\nkey:000000000001\r\n$3\r\nxxx\r\n";
183        for cut in 0..frame.len() {
184            let r = parse_command_borrowed(&frame[..cut]);
185            assert!(
186                matches!(r, Ok(None)),
187                "prefix len {cut} gave {:?}",
188                r.map(|o| o.map(|(_, used)| used))
189            );
190        }
191        let (argv, used) = parse_command_borrowed(frame).unwrap().unwrap();
192        assert_eq!(used, frame.len());
193        assert_eq!(argv.len(), 3);
194        assert_eq!(argv.get(1), Some(b"key:000000000001" as &[u8]));
195    }
196
197    #[test]
198    fn borrowed_plus_sign_bulk_len_matches_parse_int_semantics() {
199        // parse_int accepts a leading '+'; the fused header parser keeps
200        // that acceptance so the two parsers agree on every frame.
201        let frame = b"*1\r\n$+4\r\nPING\r\n";
202        let (argv, used) = parse_command_borrowed(frame).unwrap().unwrap();
203        assert_eq!(argv, vec![b"PING".to_vec()]);
204        assert_eq!(used, frame.len());
205    }
206
207    #[test]
208    fn borrowed_null_array_yields_empty_argv() {
209        let frame = b"*-1\r\n";
210        let (argv, used) = parse_command_borrowed(frame).unwrap().unwrap();
211        assert!(argv.is_empty());
212        assert_eq!(used, frame.len());
213    }
214
215    #[test]
216    fn borrowed_into_owned_matches_parse_command() {
217        // For every well-formed frame, parse_command_borrowed(buf).into_owned()
218        // must equal parse_command(buf).0 — the materialised argv.
219        let frames: &[&[u8]] = &[
220            b"*1\r\n$4\r\nPING\r\n",
221            b"*2\r\n$4\r\nECHO\r\n$5\r\nhello\r\n",
222            b"*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$5\r\nvalue\r\n",
223            b"PING\r\n",
224            b"ECHO  hi there\r\n",
225        ];
226        for frame in frames {
227            let (owned, owned_used) = parse_command(frame).unwrap().unwrap();
228            let (borrowed, b_used) = parse_command_borrowed(frame).unwrap().unwrap();
229            assert_eq!(owned_used, b_used, "consumed mismatch for {:?}", frame);
230            let materialised = borrowed.into_owned();
231            assert_eq!(owned, materialised, "argv mismatch for {:?}", frame);
232        }
233    }
234
235    #[test]
236    fn borrowed_round_trip_command() {
237        let mut buf = Vec::new();
238        encode_command(&mut buf, &[b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]);
239        let (argv, used) = parse_command_borrowed(&buf).unwrap().unwrap();
240        assert_eq!(argv, vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]);
241        assert_eq!(used, buf.len());
242    }
243
244    #[test]
245    fn borrowed_handles_split_buffer_after_consumed() {
246        // After consuming one frame, the next call sees only the remainder —
247        // the borrow scopes don't leak across calls.
248        let mut stream = Vec::new();
249        encode_command(&mut stream, &[b"PING".to_vec()]);
250        encode_command(&mut stream, &[b"ECHO".to_vec(), b"hi".to_vec()]);
251        let (a, used) = parse_command_borrowed(&stream).unwrap().unwrap();
252        assert_eq!(a, vec![b"PING".to_vec()]);
253        let (b, used2) = parse_command_borrowed(&stream[used..]).unwrap().unwrap();
254        assert_eq!(b, vec![b"ECHO".to_vec(), b"hi".to_vec()]);
255        assert_eq!(used + used2, stream.len());
256    }
257}