1use crate::command::{Command, ParseOptions};
39use crate::error::ParseError;
40
41pub const STREAMING_THRESHOLD: usize = 64 * 1024;
46
47#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct SetHeader<'a> {
52 pub key: &'a [u8],
54 pub flags: u32,
56 pub exptime: u32,
58 pub noreply: bool,
60}
61
62#[derive(Debug)]
64pub enum ParseProgress<'a> {
65 Incomplete,
67
68 NeedValue {
76 header: SetHeader<'a>,
78 value_len: usize,
80 value_prefix: &'a [u8],
83 header_consumed: usize,
85 },
86
87 Complete(Command<'a>, usize),
91}
92
93pub fn parse_streaming<'a>(
115 buffer: &'a [u8],
116 options: &ParseOptions,
117 streaming_threshold: usize,
118) -> Result<ParseProgress<'a>, ParseError> {
119 let max_line_len = options.max_line_len();
121 let line_end = match find_crlf(buffer, max_line_len)? {
122 Some(pos) => pos,
123 None => return Ok(ParseProgress::Incomplete),
124 };
125
126 let line = &buffer[..line_end];
127 let mut parts = line.split(|&b| b == b' ');
128
129 let cmd = parts.next().ok_or(ParseError::Protocol("empty command"))?;
130
131 if cmd != b"set" && cmd != b"SET" {
133 return match Command::parse_with_options(buffer, options) {
135 Ok((cmd, consumed)) => Ok(ParseProgress::Complete(cmd, consumed)),
136 Err(ParseError::Incomplete) => Ok(ParseProgress::Incomplete),
137 Err(e) => Err(e),
138 };
139 }
140
141 let key = parts
143 .next()
144 .ok_or(ParseError::Protocol("set requires key"))?;
145 if key.is_empty() {
146 return Err(ParseError::Protocol("empty key"));
147 }
148 if key.len() > options.max_key_len {
149 return Err(ParseError::Protocol("key too large"));
150 }
151
152 let flags_str = parts
153 .next()
154 .ok_or(ParseError::Protocol("set requires flags"))?;
155 let exptime_str = parts
156 .next()
157 .ok_or(ParseError::Protocol("set requires exptime"))?;
158 let bytes_str = parts
159 .next()
160 .ok_or(ParseError::Protocol("set requires bytes"))?;
161
162 let flags = parse_u32(flags_str)?;
163 let exptime = parse_u32(exptime_str)?;
164 let value_len = parse_usize(bytes_str)?;
165
166 if value_len > options.max_value_len {
167 return Err(ParseError::Protocol("value too large"));
168 }
169
170 let noreply = parts.next().map(|s| s == b"noreply").unwrap_or(false);
172
173 let header_consumed = line_end + 2;
175
176 if value_len >= streaming_threshold {
178 let value_start = header_consumed;
180 let available = buffer.len().saturating_sub(value_start);
181 let prefix_len = std::cmp::min(available, value_len);
182 let value_prefix = &buffer[value_start..value_start + prefix_len];
183
184 return Ok(ParseProgress::NeedValue {
185 header: SetHeader {
186 key,
187 flags,
188 exptime,
189 noreply,
190 },
191 value_len,
192 value_prefix,
193 header_consumed,
194 });
195 }
196
197 let data_start = header_consumed;
199 let data_end = data_start
200 .checked_add(value_len)
201 .ok_or(ParseError::InvalidNumber)?;
202 let total_len = data_end.checked_add(2).ok_or(ParseError::InvalidNumber)?;
203
204 if buffer.len() < total_len {
205 return Ok(ParseProgress::Incomplete);
206 }
207
208 if buffer[data_end] != b'\r' || buffer[data_end + 1] != b'\n' {
210 return Err(ParseError::Protocol("missing data terminator"));
211 }
212
213 let data = &buffer[data_start..data_end];
214 Ok(ParseProgress::Complete(
215 Command::Set {
216 key,
217 flags,
218 exptime,
219 data,
220 },
221 total_len,
222 ))
223}
224
225pub fn complete_set<'a>(header: &SetHeader<'_>, value: &'a [u8]) -> Command<'a> {
239 Command::Set {
240 key: unsafe {
241 std::mem::transmute::<&[u8], &'a [u8]>(header.key)
245 },
246 flags: header.flags,
247 exptime: header.exptime,
248 data: value,
249 }
250}
251
252fn find_crlf(buffer: &[u8], max_line_len: usize) -> Result<Option<usize>, ParseError> {
254 if let Some(pos) = memchr::memchr(b'\r', buffer)
255 .filter(|&pos| pos + 1 < buffer.len() && buffer[pos + 1] == b'\n')
256 {
257 return Ok(Some(pos));
258 }
259
260 if buffer.len() > max_line_len {
262 return Err(ParseError::Protocol("line too long"));
263 }
264
265 Ok(None)
266}
267
268fn parse_u32(data: &[u8]) -> Result<u32, ParseError> {
270 std::str::from_utf8(data)
271 .map_err(|_| ParseError::InvalidNumber)?
272 .parse()
273 .map_err(|_| ParseError::InvalidNumber)
274}
275
276fn parse_usize(data: &[u8]) -> Result<usize, ParseError> {
278 std::str::from_utf8(data)
279 .map_err(|_| ParseError::InvalidNumber)?
280 .parse()
281 .map_err(|_| ParseError::InvalidNumber)
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 #[test]
289 fn test_small_set_complete() {
290 let data = b"set mykey 0 3600 7\r\nmyvalue\r\n";
291 let result = parse_streaming(data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
292
293 match result {
294 ParseProgress::Complete(cmd, consumed) => {
295 assert_eq!(
296 cmd,
297 Command::Set {
298 key: b"mykey",
299 flags: 0,
300 exptime: 3600,
301 data: b"myvalue",
302 }
303 );
304 assert_eq!(consumed, data.len());
305 }
306 _ => panic!("expected Complete"),
307 }
308 }
309
310 #[test]
311 fn test_large_set_needs_value() {
312 let value_len = 100 * 1024; let header = format!("set mykey 0 3600 {}\r\n", value_len);
314 let mut data = header.as_bytes().to_vec();
315 data.extend_from_slice(&vec![b'x'; 1000]);
317
318 let result = parse_streaming(&data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
319
320 match result {
321 ParseProgress::NeedValue {
322 header,
323 value_len: vl,
324 value_prefix,
325 header_consumed,
326 } => {
327 assert_eq!(header.key, b"mykey");
328 assert_eq!(header.flags, 0);
329 assert_eq!(header.exptime, 3600);
330 assert!(!header.noreply);
331 assert_eq!(vl, 100 * 1024);
332 assert_eq!(value_prefix.len(), 1000);
333 assert!(value_prefix.iter().all(|&b| b == b'x'));
334 assert_eq!(header_consumed, 25); }
336 _ => panic!("expected NeedValue, got {:?}", result),
337 }
338 }
339
340 #[test]
341 fn test_set_with_noreply() {
342 let value_len = 100 * 1024;
343 let header = format!("set mykey 0 3600 {} noreply\r\n", value_len);
344
345 let result = parse_streaming(
346 header.as_bytes(),
347 &ParseOptions::default(),
348 STREAMING_THRESHOLD,
349 )
350 .unwrap();
351
352 match result {
353 ParseProgress::NeedValue { header, .. } => {
354 assert!(header.noreply);
355 }
356 _ => panic!("expected NeedValue"),
357 }
358 }
359
360 #[test]
361 fn test_get_uses_normal_path() {
362 let data = b"get mykey\r\n";
363 let result = parse_streaming(data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
364
365 match result {
366 ParseProgress::Complete(cmd, consumed) => {
367 assert_eq!(cmd, Command::Get { key: b"mykey" });
368 assert_eq!(consumed, data.len());
369 }
370 _ => panic!("expected Complete"),
371 }
372 }
373
374 #[test]
375 fn test_incomplete_header() {
376 let data = b"set mykey 0 360";
377 let result = parse_streaming(data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
378
379 match result {
380 ParseProgress::Incomplete => {}
381 _ => panic!("expected Incomplete"),
382 }
383 }
384
385 #[test]
386 fn test_incomplete_small_value() {
387 let data = b"set mykey 0 3600 100\r\npartial";
388 let result = parse_streaming(data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
389
390 match result {
391 ParseProgress::Incomplete => {}
392 _ => panic!("expected Incomplete"),
393 }
394 }
395
396 #[test]
397 fn test_threshold_boundary() {
398 let value_len = STREAMING_THRESHOLD;
400 let header = format!("set mykey 0 3600 {}\r\n", value_len);
401
402 let result = parse_streaming(
403 header.as_bytes(),
404 &ParseOptions::default(),
405 STREAMING_THRESHOLD,
406 )
407 .unwrap();
408
409 match result {
410 ParseProgress::NeedValue { value_len: vl, .. } => {
411 assert_eq!(vl, STREAMING_THRESHOLD);
412 }
413 _ => panic!("expected NeedValue at threshold"),
414 }
415
416 let value_len = STREAMING_THRESHOLD - 1;
418 let header = format!("set mykey 0 3600 {}\r\n", value_len);
419
420 let result = parse_streaming(
421 header.as_bytes(),
422 &ParseOptions::default(),
423 STREAMING_THRESHOLD,
424 )
425 .unwrap();
426
427 match result {
428 ParseProgress::Incomplete => {}
429 _ => panic!("expected Incomplete for sub-threshold without data"),
430 }
431 }
432
433 #[test]
434 fn test_delete_command() {
435 let data = b"delete mykey\r\n";
436 let result = parse_streaming(data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
437
438 match result {
439 ParseProgress::Complete(Command::Delete { key }, consumed) => {
440 assert_eq!(key, b"mykey");
441 assert_eq!(consumed, data.len());
442 }
443 _ => panic!("expected Complete Delete"),
444 }
445 }
446
447 #[test]
448 fn test_flush_all_command() {
449 let data = b"flush_all\r\n";
450 let result = parse_streaming(data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
451
452 match result {
453 ParseProgress::Complete(Command::FlushAll, consumed) => {
454 assert_eq!(consumed, data.len());
455 }
456 _ => panic!("expected Complete FlushAll"),
457 }
458 }
459
460 #[test]
461 fn test_complete_set_helper() {
462 let header = SetHeader {
463 key: b"mykey",
464 flags: 42,
465 exptime: 3600,
466 noreply: false,
467 };
468 let value = b"myvalue";
469
470 let cmd = complete_set(&header, value);
471
472 match cmd {
473 Command::Set {
474 key,
475 flags,
476 exptime,
477 data,
478 } => {
479 assert_eq!(key, b"mykey");
480 assert_eq!(flags, 42);
481 assert_eq!(exptime, 3600);
482 assert_eq!(data, b"myvalue");
483 }
484 _ => panic!("expected Set command"),
485 }
486 }
487}