Skip to main content

memcached_async/codec/
binary.rs

1use bytes::{Bytes, BytesMut};
2
3use crate::codec::DecodeOutcome;
4use crate::error::Error;
5use crate::response::Response;
6use crate::types::{Op, Protocol, ReplyMode, Request, RequestMeta, StatLine};
7
8/// Binary decode limits.
9#[derive(Debug, Clone, Copy)]
10pub struct BinaryLimits {
11    pub max_frame_len: usize,
12}
13
14impl Default for BinaryLimits {
15    fn default() -> Self {
16        Self {
17            max_frame_len: 1 << 21,
18        }
19    }
20}
21
22/// Binary protocol decoder.
23#[derive(Debug, Clone, Default)]
24pub struct BinaryDecoder;
25
26impl BinaryDecoder {
27    pub fn new() -> Self {
28        Self
29    }
30
31    pub fn decode(&mut self, buf: &mut BytesMut, limits: BinaryLimits) -> Option<DecodeOutcome> {
32        if buf.len() < 24 {
33            return None;
34        }
35        let header = &buf[..24];
36        let magic = header[0];
37        let opcode = header[1];
38        let key_len = u16::from_be_bytes([header[2], header[3]]) as usize;
39        let extras_len = header[4] as usize;
40        let body_len = u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
41        let opaque = u32::from_be_bytes([header[12], header[13], header[14], header[15]]);
42        let cas = u64::from_be_bytes([
43            header[16], header[17], header[18], header[19], header[20], header[21], header[22],
44            header[23],
45        ]);
46
47        let mut meta = RequestMeta {
48            protocol: Protocol::Binary,
49            reply: ReplyMode::Always,
50            opaque: Some(opaque),
51            return_key: false,
52            opcode,
53        };
54
55        if magic != 0x80 {
56            let err = Error::server("invalid magic");
57            return Some(DecodeOutcome::Response(meta, Response::Error(err)));
58        }
59        if body_len > limits.max_frame_len {
60            let err = Error::server("frame too large");
61            return Some(DecodeOutcome::Response(meta, Response::Error(err)));
62        }
63        if buf.len() < 24 + body_len {
64            return None;
65        }
66
67        let frame = buf.split_to(24 + body_len).freeze();
68        let body = frame.slice(24..);
69
70        if extras_len + key_len > body_len {
71            let err = Error::client("invalid lengths");
72            return Some(DecodeOutcome::Response(meta, Response::Error(err)));
73        }
74
75        let extras = body.slice(0..extras_len);
76        let key = body.slice(extras_len..extras_len + key_len);
77        let value = body.slice(extras_len + key_len..body_len);
78
79        let (op, quiet, return_key) = opcode_to_op(opcode);
80        meta.reply = if quiet {
81            ReplyMode::QuietBuffered
82        } else {
83            ReplyMode::Always
84        };
85        meta.return_key = return_key;
86
87        let mut req = Request::new(op);
88
89        if key_len > 0 {
90            if key_len > 250 {
91                let err = Error::client("key too long");
92                return Some(DecodeOutcome::Response(meta, Response::Error(err)));
93            }
94            if !is_valid_key(&key) {
95                let err = Error::client("invalid key");
96                return Some(DecodeOutcome::Response(meta, Response::Error(err)));
97            }
98            req.key = Some(key.clone());
99        }
100
101        if cas != 0 {
102            req.cas = Some(cas);
103        }
104
105        let result = match op {
106            Op::Get => {
107                if key_len == 0 || extras_len != 0 || !value.is_empty() {
108                    Err("invalid get")
109                } else {
110                    Ok(())
111                }
112            }
113            Op::Set | Op::Add | Op::Replace => {
114                if key_len == 0 || extras_len != 8 {
115                    Err("invalid storage")
116                } else {
117                    parse_flags_exptime(&extras, &mut req);
118                    req.value = Some(value);
119                    Ok(())
120                }
121            }
122            Op::Append | Op::Prepend => {
123                if key_len == 0 || (extras_len != 0 && extras_len != 8) {
124                    Err("invalid append")
125                } else {
126                    if extras_len == 8 {
127                        parse_flags_exptime(&extras, &mut req);
128                    }
129                    req.value = Some(value);
130                    Ok(())
131                }
132            }
133            Op::Delete => {
134                if key_len == 0 || extras_len != 0 || !value.is_empty() {
135                    Err("invalid delete")
136                } else {
137                    Ok(())
138                }
139            }
140            Op::Incr | Op::Decr => {
141                if key_len == 0 || extras_len != 20 || !value.is_empty() {
142                    Err("invalid incr")
143                } else {
144                    parse_delta(&extras, &mut req);
145                    Ok(())
146                }
147            }
148            Op::Touch => {
149                if key_len == 0 || extras_len != 4 || !value.is_empty() {
150                    Err("invalid touch")
151                } else {
152                    req.exptime =
153                        Some(
154                            u32::from_be_bytes([extras[0], extras[1], extras[2], extras[3]]) as i64,
155                        );
156                    Ok(())
157                }
158            }
159            Op::Stats => {
160                if extras_len != 0 {
161                    Err("invalid stats")
162                } else {
163                    if key_len > 0 {
164                        req.key = Some(key);
165                    }
166                    Ok(())
167                }
168            }
169            Op::Version | Op::Noop | Op::Quit => {
170                if extras_len != 0 || key_len != 0 || !value.is_empty() {
171                    Err("invalid command")
172                } else {
173                    Ok(())
174                }
175            }
176            Op::Gets | Op::Gat | Op::Gats | Op::Cas => Ok(()),
177            Op::Unknown
178            | Op::MetaGet
179            | Op::MetaSet
180            | Op::MetaDelete
181            | Op::MetaArithmetic
182            | Op::MetaDebug
183            | Op::MetaNoop => Ok(()),
184        };
185
186        if result.is_err() {
187            let err = Error::client("invalid arguments");
188            return Some(DecodeOutcome::Response(meta, Response::Error(err)));
189        }
190
191        Some(DecodeOutcome::Request(req, meta))
192    }
193}
194
195fn parse_flags_exptime(extras: &Bytes, req: &mut Request) {
196    if extras.len() < 8 {
197        return;
198    }
199    req.flags = Some(u32::from_be_bytes([
200        extras[0], extras[1], extras[2], extras[3],
201    ]));
202    req.exptime = Some(u32::from_be_bytes([extras[4], extras[5], extras[6], extras[7]]) as i64);
203}
204
205fn parse_delta(extras: &Bytes, req: &mut Request) {
206    if extras.len() < 20 {
207        return;
208    }
209    req.delta = Some(u64::from_be_bytes([
210        extras[0], extras[1], extras[2], extras[3], extras[4], extras[5], extras[6], extras[7],
211    ]));
212    req.initial = Some(u64::from_be_bytes([
213        extras[8], extras[9], extras[10], extras[11], extras[12], extras[13], extras[14],
214        extras[15],
215    ]));
216    req.exptime = Some(u32::from_be_bytes([extras[16], extras[17], extras[18], extras[19]]) as i64);
217}
218
219fn opcode_to_op(opcode: u8) -> (Op, bool, bool) {
220    match opcode {
221        0x00 => (Op::Get, false, false),
222        0x09 => (Op::Get, true, false),
223        0x0c => (Op::Get, false, true),
224        0x0d => (Op::Get, true, true),
225        0x01 => (Op::Set, false, false),
226        0x11 => (Op::Set, true, false),
227        0x02 => (Op::Add, false, false),
228        0x12 => (Op::Add, true, false),
229        0x03 => (Op::Replace, false, false),
230        0x13 => (Op::Replace, true, false),
231        0x04 => (Op::Delete, false, false),
232        0x14 => (Op::Delete, true, false),
233        0x05 => (Op::Incr, false, false),
234        0x15 => (Op::Incr, true, false),
235        0x06 => (Op::Decr, false, false),
236        0x16 => (Op::Decr, true, false),
237        0x07 => (Op::Quit, false, false),
238        0x17 => (Op::Quit, true, false),
239        0x0a => (Op::Noop, false, false),
240        0x0b => (Op::Version, false, false),
241        0x0e => (Op::Append, false, false),
242        0x19 => (Op::Append, true, false),
243        0x0f => (Op::Prepend, false, false),
244        0x1a => (Op::Prepend, true, false),
245        0x10 => (Op::Stats, false, false),
246        _ => (Op::Unknown, false, false),
247    }
248}
249
250fn is_valid_key(key: &Bytes) -> bool {
251    if key.is_empty() || key.len() > 250 {
252        return false;
253    }
254    for &b in key.as_ref() {
255        if b <= b' ' || b == 0x7f {
256            return false;
257        }
258    }
259    true
260}
261
262pub const STATUS_SUCCESS: u16 = 0x0000;
263pub const STATUS_KEY_NOT_FOUND: u16 = 0x0001;
264pub const STATUS_KEY_EXISTS: u16 = 0x0002;
265pub const STATUS_ITEM_NOT_STORED: u16 = 0x0005;
266pub const STATUS_INVALID_ARGUMENTS: u16 = 0x0004;
267pub const STATUS_UNKNOWN_COMMAND: u16 = 0x0081;
268pub const STATUS_INTERNAL_ERROR: u16 = 0x0084;
269
270pub fn encode_binary_response(
271    meta: RequestMeta,
272    response: &Response,
273    out: &mut BytesMut,
274    return_key: bool,
275) -> (u16, usize) {
276    let opcode = meta.opcode;
277    let opaque = meta.opaque.unwrap_or(0);
278
279    match response {
280        Response::Stored | Response::Deleted | Response::Touched | Response::Noop => {
281            encode_header(
282                out,
283                HeaderFields::new(opcode, 0, 0, STATUS_SUCCESS, 0, opaque, 0),
284            );
285            (STATUS_SUCCESS, 24)
286        }
287        Response::NotStored => {
288            encode_header(
289                out,
290                HeaderFields::new(opcode, 0, 0, STATUS_ITEM_NOT_STORED, 0, opaque, 0),
291            );
292            (STATUS_ITEM_NOT_STORED, 24)
293        }
294        Response::Exists => {
295            encode_header(
296                out,
297                HeaderFields::new(opcode, 0, 0, STATUS_KEY_EXISTS, 0, opaque, 0),
298            );
299            (STATUS_KEY_EXISTS, 24)
300        }
301        Response::NotFound => {
302            encode_header(
303                out,
304                HeaderFields::new(opcode, 0, 0, STATUS_KEY_NOT_FOUND, 0, opaque, 0),
305            );
306            (STATUS_KEY_NOT_FOUND, 24)
307        }
308        Response::Numeric(value) => {
309            let extras_len = 0u8;
310            let key_len = 0u16;
311            let body_len = 8u32;
312            encode_header(
313                out,
314                HeaderFields::new(
315                    opcode,
316                    extras_len,
317                    key_len,
318                    STATUS_SUCCESS,
319                    body_len,
320                    opaque,
321                    0,
322                ),
323            );
324            out.extend_from_slice(&value.to_be_bytes());
325            (STATUS_SUCCESS, 24 + 8)
326        }
327        Response::Value(entry) => {
328            let extras_len = 4u8;
329            let key = if return_key { entry.key.as_ref() } else { b"" };
330            let key_len = key.len() as u16;
331            let body_len = extras_len as u32 + key_len as u32 + entry.value.len() as u32;
332            encode_header(
333                out,
334                HeaderFields::new(
335                    opcode,
336                    extras_len,
337                    key_len,
338                    STATUS_SUCCESS,
339                    body_len,
340                    opaque,
341                    entry.cas.unwrap_or(0),
342                ),
343            );
344            out.extend_from_slice(&entry.flags.to_be_bytes());
345            if return_key {
346                out.extend_from_slice(key);
347            }
348            out.extend_from_slice(entry.value.as_ref());
349            (STATUS_SUCCESS, 24 + body_len as usize)
350        }
351        Response::Values(entries) => {
352            if let Some(entry) = entries.first() {
353                encode_binary_response(meta, &Response::Value(entry.clone()), out, return_key)
354            } else {
355                encode_header(
356                    out,
357                    HeaderFields::new(opcode, 0, 0, STATUS_KEY_NOT_FOUND, 0, opaque, 0),
358                );
359                (STATUS_KEY_NOT_FOUND, 24)
360            }
361        }
362        Response::Stats(lines) => {
363            let mut total = 0usize;
364            for line in lines {
365                total += encode_stat_line(meta, line, out);
366            }
367            total += encode_header(
368                out,
369                HeaderFields::new(opcode, 0, 0, STATUS_SUCCESS, 0, opaque, 0),
370            );
371            (STATUS_SUCCESS, total)
372        }
373        Response::Version(version) => {
374            let body_len = version.len() as u32;
375            encode_header(
376                out,
377                HeaderFields::new(opcode, 0, 0, STATUS_SUCCESS, body_len, opaque, 0),
378            );
379            out.extend_from_slice(version.as_ref());
380            (STATUS_SUCCESS, 24 + version.len())
381        }
382        Response::Error(err) => {
383            let status = match err.kind {
384                crate::error::ErrorKind::UnknownCommand => STATUS_UNKNOWN_COMMAND,
385                crate::error::ErrorKind::Client => STATUS_INVALID_ARGUMENTS,
386                crate::error::ErrorKind::Server => STATUS_INTERNAL_ERROR,
387            };
388            let body_len = err.message.len() as u32;
389            encode_header(
390                out,
391                HeaderFields::new(opcode, 0, 0, status, body_len, opaque, 0),
392            );
393            out.extend_from_slice(err.message.as_ref());
394            (status, 24 + err.message.len())
395        }
396        Response::Meta(_) | Response::ValuesStream(_) | Response::StatsStream(_) => {
397            encode_header(
398                out,
399                HeaderFields::new(opcode, 0, 0, STATUS_INTERNAL_ERROR, 0, opaque, 0),
400            );
401            (STATUS_INTERNAL_ERROR, 24)
402        }
403    }
404}
405
406struct HeaderFields {
407    opcode: u8,
408    extras_len: u8,
409    key_len: u16,
410    status: u16,
411    body_len: u32,
412    opaque: u32,
413    cas: u64,
414}
415
416impl HeaderFields {
417    fn new(
418        opcode: u8,
419        extras_len: u8,
420        key_len: u16,
421        status: u16,
422        body_len: u32,
423        opaque: u32,
424        cas: u64,
425    ) -> Self {
426        Self {
427            opcode,
428            extras_len,
429            key_len,
430            status,
431            body_len,
432            opaque,
433            cas,
434        }
435    }
436}
437
438fn encode_header(out: &mut BytesMut, header: HeaderFields) -> usize {
439    let HeaderFields {
440        opcode,
441        extras_len,
442        key_len,
443        status,
444        body_len,
445        opaque,
446        cas,
447    } = header;
448    out.extend_from_slice(&[
449        0x81,
450        opcode,
451        (key_len >> 8) as u8,
452        (key_len & 0xff) as u8,
453        extras_len,
454        0x00,
455        (status >> 8) as u8,
456        (status & 0xff) as u8,
457        ((body_len >> 24) & 0xff) as u8,
458        ((body_len >> 16) & 0xff) as u8,
459        ((body_len >> 8) & 0xff) as u8,
460        (body_len & 0xff) as u8,
461        ((opaque >> 24) & 0xff) as u8,
462        ((opaque >> 16) & 0xff) as u8,
463        ((opaque >> 8) & 0xff) as u8,
464        (opaque & 0xff) as u8,
465        ((cas >> 56) & 0xff) as u8,
466        ((cas >> 48) & 0xff) as u8,
467        ((cas >> 40) & 0xff) as u8,
468        ((cas >> 32) & 0xff) as u8,
469        ((cas >> 24) & 0xff) as u8,
470        ((cas >> 16) & 0xff) as u8,
471        ((cas >> 8) & 0xff) as u8,
472        (cas & 0xff) as u8,
473    ]);
474    24
475}
476
477fn encode_stat_line(meta: RequestMeta, line: &StatLine, out: &mut BytesMut) -> usize {
478    let opcode = meta.opcode;
479    let opaque = meta.opaque.unwrap_or(0);
480    let key_len = line.key.len() as u16;
481    let body_len = key_len as u32 + line.value.len() as u32;
482    encode_header(
483        out,
484        HeaderFields::new(opcode, 0, key_len, STATUS_SUCCESS, body_len, opaque, 0),
485    );
486    out.extend_from_slice(line.key.as_ref());
487    out.extend_from_slice(line.value.as_ref());
488    24 + body_len as usize
489}
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494    use bytes::BytesMut;
495
496    #[test]
497    fn decode_get_request() {
498        let key = b"foo";
499        let mut buf = BytesMut::with_capacity(24 + key.len());
500        buf.extend_from_slice(&[
501            0x80,
502            0x00, // magic, opcode
503            0x00,
504            key.len() as u8, // key length
505            0x00,            // extras length
506            0x00,            // data type
507            0x00,
508            0x00, // reserved
509            0x00,
510            0x00,
511            0x00,
512            key.len() as u8, // body length
513            0xde,
514            0xad,
515            0xbe,
516            0xef, // opaque
517            0x00,
518            0x00,
519            0x00,
520            0x00,
521            0x00,
522            0x00,
523            0x00,
524            0x00, // cas
525        ]);
526        buf.extend_from_slice(key);
527        let mut decoder = BinaryDecoder::new();
528        let outcome = decoder.decode(&mut buf, BinaryLimits::default());
529        let (req, meta) = match outcome {
530            Some(DecodeOutcome::Request(req, meta)) => (req, meta),
531            _ => panic!("unexpected outcome"),
532        };
533        assert_eq!(req.op, Op::Get);
534        assert_eq!(req.key.unwrap(), Bytes::from_static(b"foo"));
535        assert_eq!(meta.protocol, Protocol::Binary);
536    }
537
538    #[test]
539    fn decode_invalid_magic() {
540        let mut buf = BytesMut::with_capacity(24);
541        buf.extend_from_slice(&[
542            0x81, 0x00, // magic, opcode
543            0x00, 0x00, // key length
544            0x00, // extras length
545            0x00, // data type
546            0x00, 0x00, // reserved
547            0x00, 0x00, 0x00, 0x00, // body length
548            0x00, 0x00, 0x00, 0x00, // opaque
549            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // cas
550        ]);
551        let mut decoder = BinaryDecoder::new();
552        let outcome = decoder.decode(&mut buf, BinaryLimits::default());
553        match outcome {
554            Some(DecodeOutcome::Response(_, Response::Error(_))) => {}
555            _ => panic!("unexpected outcome"),
556        }
557    }
558}