Skip to main content

memcache_proto/
streaming.rs

1//! Streaming parser for large memcache SET commands.
2//!
3//! This module provides streaming support for SET commands with large values,
4//! allowing zero-copy receive directly into cache segment memory.
5//!
6//! # Overview
7//!
8//! For SET commands with values >= `STREAMING_THRESHOLD`, the parser returns
9//! `NeedValue` after parsing the command header, allowing the caller to receive
10//! the value directly into a target buffer (e.g., cache segment memory).
11//!
12//! For all other commands (including small SETs), it behaves identically to
13//! `Command::parse()`.
14//!
15//! # Example
16//!
17//! ```ignore
18//! use protocol_memcache::{parse_streaming, ParseProgress, STREAMING_THRESHOLD};
19//!
20//! let data = b"set mykey 0 3600 100000\r\n";
21//!
22//! match parse_streaming(data, &ParseOptions::default(), STREAMING_THRESHOLD)? {
23//!     ParseProgress::NeedValue { header, value_len, value_prefix, header_consumed } => {
24//!         // Allocate buffer and receive value directly
25//!         let mut reservation = cache.begin_segment_set(&header.key, value_len, ttl)?;
26//!         reservation.value_mut()[..value_prefix.len()].copy_from_slice(value_prefix);
27//!         // ... receive remaining bytes ...
28//!     }
29//!     ParseProgress::Complete(cmd, consumed) => {
30//!         // Small SET or other command - handle normally
31//!     }
32//!     ParseProgress::Incomplete => {
33//!         // Need more data
34//!     }
35//! }
36//! ```
37
38use crate::command::{Command, ParseOptions};
39use crate::error::ParseError;
40
41/// Threshold for streaming large values (64KB).
42///
43/// SET commands with values >= this size will use the streaming path,
44/// returning `NeedValue` to allow zero-copy receive.
45pub const STREAMING_THRESHOLD: usize = 64 * 1024;
46
47/// Parsed SET command header (before value data).
48///
49/// Contains all metadata needed to complete the SET after receiving the value.
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct SetHeader<'a> {
52    /// The key to set.
53    pub key: &'a [u8],
54    /// Client-defined flags (opaque to server).
55    pub flags: u32,
56    /// Expiration time in seconds (0 = never, or Unix timestamp if > 30 days).
57    pub exptime: u32,
58    /// Whether to suppress the response ("noreply" option).
59    pub noreply: bool,
60}
61
62/// Result of incremental parsing.
63#[derive(Debug)]
64pub enum ParseProgress<'a> {
65    /// Need more data to continue parsing.
66    Incomplete,
67
68    /// SET header parsed, waiting for value data.
69    ///
70    /// The caller should:
71    /// 1. Allocate a buffer for the value (e.g., in cache segment)
72    /// 2. Copy `value_prefix` to the start of that buffer
73    /// 3. Receive remaining `value_len - value_prefix.len()` bytes into the buffer
74    /// 4. Consume `header_consumed + value_len + 2` bytes from the input buffer
75    NeedValue {
76        /// Parsed command header with metadata.
77        header: SetHeader<'a>,
78        /// Total size of the value in bytes.
79        value_len: usize,
80        /// Bytes of value already in the parse buffer (may be empty).
81        /// These must be copied to the target buffer before receiving more.
82        value_prefix: &'a [u8],
83        /// Bytes consumed from buffer so far (header + CRLF, before value).
84        header_consumed: usize,
85    },
86
87    /// Fully parsed command.
88    ///
89    /// The tuple contains the parsed command and the number of bytes consumed.
90    Complete(Command<'a>, usize),
91}
92
93/// Parse a memcache command with streaming support for large values.
94///
95/// For SET commands with values >= `streaming_threshold`, this returns
96/// `NeedValue` after parsing the header, allowing the caller to receive
97/// the value directly into a target buffer.
98///
99/// For all other commands (including small SETs), it behaves identically to
100/// `Command::parse_with_options()`.
101///
102/// # Arguments
103///
104/// * `buffer` - The input buffer containing memcache protocol data
105/// * `options` - Parse options (max lengths, etc.)
106/// * `streaming_threshold` - Minimum value size for streaming (use `STREAMING_THRESHOLD`)
107///
108/// # Returns
109///
110/// * `Ok(ParseProgress::Incomplete)` - Need more data
111/// * `Ok(ParseProgress::NeedValue { .. })` - SET header parsed, value pending
112/// * `Ok(ParseProgress::Complete(cmd, consumed))` - Fully parsed command
113/// * `Err(ParseError)` - Parse error
114pub fn parse_streaming<'a>(
115    buffer: &'a [u8],
116    options: &ParseOptions,
117    streaming_threshold: usize,
118) -> Result<ParseProgress<'a>, ParseError> {
119    // Find the end of the command line
120    let max_line_len = options.max_line_len();
121    let line_end = match find_crlf(buffer, max_line_len)? {
122        Some(pos) => pos,
123        None => return Ok(ParseProgress::Incomplete),
124    };
125
126    let line = &buffer[..line_end];
127    let mut parts = line.split(|&b| b == b' ');
128
129    let cmd = parts.next().ok_or(ParseError::Protocol("empty command"))?;
130
131    // Only SET commands use the streaming path
132    if cmd != b"set" && cmd != b"SET" {
133        // Use the standard parser for non-SET commands
134        return match Command::parse_with_options(buffer, options) {
135            Ok((cmd, consumed)) => Ok(ParseProgress::Complete(cmd, consumed)),
136            Err(ParseError::Incomplete) => Ok(ParseProgress::Incomplete),
137            Err(e) => Err(e),
138        };
139    }
140
141    // Parse SET command header
142    let key = parts
143        .next()
144        .ok_or(ParseError::Protocol("set requires key"))?;
145    if key.is_empty() {
146        return Err(ParseError::Protocol("empty key"));
147    }
148    if key.len() > options.max_key_len {
149        return Err(ParseError::Protocol("key too large"));
150    }
151
152    let flags_str = parts
153        .next()
154        .ok_or(ParseError::Protocol("set requires flags"))?;
155    let exptime_str = parts
156        .next()
157        .ok_or(ParseError::Protocol("set requires exptime"))?;
158    let bytes_str = parts
159        .next()
160        .ok_or(ParseError::Protocol("set requires bytes"))?;
161
162    let flags = parse_u32(flags_str)?;
163    let exptime = parse_u32(exptime_str)?;
164    let value_len = parse_usize(bytes_str)?;
165
166    if value_len > options.max_value_len {
167        return Err(ParseError::Protocol("value too large"));
168    }
169
170    // Check for optional "noreply"
171    let noreply = parts.next().map(|s| s == b"noreply").unwrap_or(false);
172
173    // Header ends after the CRLF
174    let header_consumed = line_end + 2;
175
176    // For large values, return NeedValue to allow streaming
177    if value_len >= streaming_threshold {
178        // Calculate how much of the value is already in the buffer
179        let value_start = header_consumed;
180        let available = buffer.len().saturating_sub(value_start);
181        let prefix_len = std::cmp::min(available, value_len);
182        let value_prefix = &buffer[value_start..value_start + prefix_len];
183
184        return Ok(ParseProgress::NeedValue {
185            header: SetHeader {
186                key,
187                flags,
188                exptime,
189                noreply,
190            },
191            value_len,
192            value_prefix,
193            header_consumed,
194        });
195    }
196
197    // For small values, parse the complete command
198    let data_start = header_consumed;
199    let data_end = data_start
200        .checked_add(value_len)
201        .ok_or(ParseError::InvalidNumber)?;
202    let total_len = data_end.checked_add(2).ok_or(ParseError::InvalidNumber)?;
203
204    if buffer.len() < total_len {
205        return Ok(ParseProgress::Incomplete);
206    }
207
208    // Verify trailing \r\n
209    if buffer[data_end] != b'\r' || buffer[data_end + 1] != b'\n' {
210        return Err(ParseError::Protocol("missing data terminator"));
211    }
212
213    let data = &buffer[data_start..data_end];
214    Ok(ParseProgress::Complete(
215        Command::Set {
216            key,
217            flags,
218            exptime,
219            data,
220        },
221        total_len,
222    ))
223}
224
225/// Complete a SET command after receiving the full value.
226///
227/// This is a helper for constructing the final Command after streaming receive.
228/// The caller is responsible for ensuring the value data is correct.
229///
230/// # Arguments
231///
232/// * `header` - The parsed SET header from `NeedValue`
233/// * `value` - The complete value data (must match the expected length)
234///
235/// # Returns
236///
237/// A `Command::Set` with the provided value.
238pub fn complete_set<'a>(header: &SetHeader<'_>, value: &'a [u8]) -> Command<'a> {
239    Command::Set {
240        key: unsafe {
241            // Safety: The key reference is valid for the lifetime of the original buffer.
242            // We transmute the lifetime to match the value's lifetime since the caller
243            // is responsible for ensuring both are valid.
244            std::mem::transmute::<&[u8], &'a [u8]>(header.key)
245        },
246        flags: header.flags,
247        exptime: header.exptime,
248        data: value,
249    }
250}
251
252/// Find \r\n in buffer, return position of \r.
253fn find_crlf(buffer: &[u8], max_line_len: usize) -> Result<Option<usize>, ParseError> {
254    if let Some(pos) = memchr::memchr(b'\r', buffer)
255        .filter(|&pos| pos + 1 < buffer.len() && buffer[pos + 1] == b'\n')
256    {
257        return Ok(Some(pos));
258    }
259
260    // No CRLF found - check if we've exceeded the line length limit
261    if buffer.len() > max_line_len {
262        return Err(ParseError::Protocol("line too long"));
263    }
264
265    Ok(None)
266}
267
268/// Parse a u32 from ASCII decimal.
269fn parse_u32(data: &[u8]) -> Result<u32, ParseError> {
270    std::str::from_utf8(data)
271        .map_err(|_| ParseError::InvalidNumber)?
272        .parse()
273        .map_err(|_| ParseError::InvalidNumber)
274}
275
276/// Parse a usize from ASCII decimal.
277fn parse_usize(data: &[u8]) -> Result<usize, ParseError> {
278    std::str::from_utf8(data)
279        .map_err(|_| ParseError::InvalidNumber)?
280        .parse()
281        .map_err(|_| ParseError::InvalidNumber)
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    #[test]
289    fn test_small_set_complete() {
290        let data = b"set mykey 0 3600 7\r\nmyvalue\r\n";
291        let result = parse_streaming(data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
292
293        match result {
294            ParseProgress::Complete(cmd, consumed) => {
295                assert_eq!(
296                    cmd,
297                    Command::Set {
298                        key: b"mykey",
299                        flags: 0,
300                        exptime: 3600,
301                        data: b"myvalue",
302                    }
303                );
304                assert_eq!(consumed, data.len());
305            }
306            _ => panic!("expected Complete"),
307        }
308    }
309
310    #[test]
311    fn test_large_set_needs_value() {
312        let value_len = 100 * 1024; // 100KB
313        let header = format!("set mykey 0 3600 {}\r\n", value_len);
314        let mut data = header.as_bytes().to_vec();
315        // Add some value prefix
316        data.extend_from_slice(&vec![b'x'; 1000]);
317
318        let result = parse_streaming(&data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
319
320        match result {
321            ParseProgress::NeedValue {
322                header,
323                value_len: vl,
324                value_prefix,
325                header_consumed,
326            } => {
327                assert_eq!(header.key, b"mykey");
328                assert_eq!(header.flags, 0);
329                assert_eq!(header.exptime, 3600);
330                assert!(!header.noreply);
331                assert_eq!(vl, 100 * 1024);
332                assert_eq!(value_prefix.len(), 1000);
333                assert!(value_prefix.iter().all(|&b| b == b'x'));
334                assert_eq!(header_consumed, 25); // "set mykey 0 3600 102400\r\n".len()
335            }
336            _ => panic!("expected NeedValue, got {:?}", result),
337        }
338    }
339
340    #[test]
341    fn test_set_with_noreply() {
342        let value_len = 100 * 1024;
343        let header = format!("set mykey 0 3600 {} noreply\r\n", value_len);
344
345        let result = parse_streaming(
346            header.as_bytes(),
347            &ParseOptions::default(),
348            STREAMING_THRESHOLD,
349        )
350        .unwrap();
351
352        match result {
353            ParseProgress::NeedValue { header, .. } => {
354                assert!(header.noreply);
355            }
356            _ => panic!("expected NeedValue"),
357        }
358    }
359
360    #[test]
361    fn test_get_uses_normal_path() {
362        let data = b"get mykey\r\n";
363        let result = parse_streaming(data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
364
365        match result {
366            ParseProgress::Complete(cmd, consumed) => {
367                assert_eq!(cmd, Command::Get { key: b"mykey" });
368                assert_eq!(consumed, data.len());
369            }
370            _ => panic!("expected Complete"),
371        }
372    }
373
374    #[test]
375    fn test_incomplete_header() {
376        let data = b"set mykey 0 360";
377        let result = parse_streaming(data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
378
379        match result {
380            ParseProgress::Incomplete => {}
381            _ => panic!("expected Incomplete"),
382        }
383    }
384
385    #[test]
386    fn test_incomplete_small_value() {
387        let data = b"set mykey 0 3600 100\r\npartial";
388        let result = parse_streaming(data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
389
390        match result {
391            ParseProgress::Incomplete => {}
392            _ => panic!("expected Incomplete"),
393        }
394    }
395
396    #[test]
397    fn test_threshold_boundary() {
398        // At threshold - should use streaming
399        let value_len = STREAMING_THRESHOLD;
400        let header = format!("set mykey 0 3600 {}\r\n", value_len);
401
402        let result = parse_streaming(
403            header.as_bytes(),
404            &ParseOptions::default(),
405            STREAMING_THRESHOLD,
406        )
407        .unwrap();
408
409        match result {
410            ParseProgress::NeedValue { value_len: vl, .. } => {
411                assert_eq!(vl, STREAMING_THRESHOLD);
412            }
413            _ => panic!("expected NeedValue at threshold"),
414        }
415
416        // Just below threshold - should use normal path (but incomplete without value)
417        let value_len = STREAMING_THRESHOLD - 1;
418        let header = format!("set mykey 0 3600 {}\r\n", value_len);
419
420        let result = parse_streaming(
421            header.as_bytes(),
422            &ParseOptions::default(),
423            STREAMING_THRESHOLD,
424        )
425        .unwrap();
426
427        match result {
428            ParseProgress::Incomplete => {}
429            _ => panic!("expected Incomplete for sub-threshold without data"),
430        }
431    }
432
433    #[test]
434    fn test_delete_command() {
435        let data = b"delete mykey\r\n";
436        let result = parse_streaming(data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
437
438        match result {
439            ParseProgress::Complete(Command::Delete { key }, consumed) => {
440                assert_eq!(key, b"mykey");
441                assert_eq!(consumed, data.len());
442            }
443            _ => panic!("expected Complete Delete"),
444        }
445    }
446
447    #[test]
448    fn test_flush_all_command() {
449        let data = b"flush_all\r\n";
450        let result = parse_streaming(data, &ParseOptions::default(), STREAMING_THRESHOLD).unwrap();
451
452        match result {
453            ParseProgress::Complete(Command::FlushAll, consumed) => {
454                assert_eq!(consumed, data.len());
455            }
456            _ => panic!("expected Complete FlushAll"),
457        }
458    }
459
460    #[test]
461    fn test_complete_set_helper() {
462        let header = SetHeader {
463            key: b"mykey",
464            flags: 42,
465            exptime: 3600,
466            noreply: false,
467        };
468        let value = b"myvalue";
469
470        let cmd = complete_set(&header, value);
471
472        match cmd {
473            Command::Set {
474                key,
475                flags,
476                exptime,
477                data,
478            } => {
479                assert_eq!(key, b"mykey");
480                assert_eq!(flags, 42);
481                assert_eq!(exptime, 3600);
482                assert_eq!(data, b"myvalue");
483            }
484            _ => panic!("expected Set command"),
485        }
486    }
487}