Skip to main content

memcached_async/codec/
binary.rs

1use bytes::{Bytes, BytesMut};
2
3use crate::codec::DecodeOutcome;
4use crate::error::Error;
5use crate::response::Response;
6use crate::types::{Op, Protocol, ReplyMode, Request, RequestMeta, StatLine};
7
8/// Binary decode limits.
9#[derive(Debug, Clone, Copy)]
10pub struct BinaryLimits {
11    pub max_frame_len: usize,
12}
13
14impl Default for BinaryLimits {
15    fn default() -> Self {
16        Self {
17            max_frame_len: 1 << 21,
18        }
19    }
20}
21
22/// Binary protocol decoder.
23#[derive(Debug, Clone, Default)]
24pub struct BinaryDecoder;
25
26impl BinaryDecoder {
27    pub fn new() -> Self {
28        Self
29    }
30
31    pub fn decode(&mut self, buf: &mut BytesMut, limits: BinaryLimits) -> Option<DecodeOutcome> {
32        if buf.len() < 24 {
33            return None;
34        }
35        let header = &buf[..24];
36        let magic = header[0];
37        let opcode = header[1];
38        let key_len = u16::from_be_bytes([header[2], header[3]]) as usize;
39        let extras_len = header[4] as usize;
40        let body_len = u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
41        let opaque = u32::from_be_bytes([header[12], header[13], header[14], header[15]]);
42        let cas = u64::from_be_bytes([
43            header[16], header[17], header[18], header[19], header[20], header[21], header[22],
44            header[23],
45        ]);
46
47        let mut meta = RequestMeta {
48            protocol: Protocol::Binary,
49            reply: ReplyMode::Always,
50            opaque: Some(opaque),
51            return_key: false,
52            opcode,
53        };
54
55        if magic != 0x80 {
56            let err = Error::server("invalid magic");
57            return Some(DecodeOutcome::Response(meta, Response::Error(err)));
58        }
59        if body_len > limits.max_frame_len {
60            let err = Error::server("frame too large");
61            return Some(DecodeOutcome::Response(meta, Response::Error(err)));
62        }
63        if buf.len() < 24 + body_len {
64            return None;
65        }
66
67        let frame = buf.split_to(24 + body_len).freeze();
68        let body = frame.slice(24..);
69
70        if extras_len + key_len > body_len {
71            let err = Error::client("invalid lengths");
72            return Some(DecodeOutcome::Response(meta, Response::Error(err)));
73        }
74
75        let extras = body.slice(0..extras_len);
76        let key = body.slice(extras_len..extras_len + key_len);
77        let value = body.slice(extras_len + key_len..body_len);
78
79        let (op, quiet, return_key) = opcode_to_op(opcode);
80        meta.reply = if quiet {
81            ReplyMode::QuietBuffered
82        } else {
83            ReplyMode::Always
84        };
85        meta.return_key = return_key;
86
87        let mut req = Request::new(op);
88
89        if key_len > 0 {
90            if key_len > 250 {
91                let err = Error::client("key too long");
92                return Some(DecodeOutcome::Response(meta, Response::Error(err)));
93            }
94            if !is_valid_key(&key) {
95                let err = Error::client("invalid key");
96                return Some(DecodeOutcome::Response(meta, Response::Error(err)));
97            }
98            req.key = Some(key.clone());
99        }
100
101        if cas != 0 {
102            req.cas = Some(cas);
103        }
104
105        let result = match op {
106            Op::Get => {
107                if key_len == 0 || extras_len != 0 || !value.is_empty() {
108                    Err("invalid get")
109                } else {
110                    Ok(())
111                }
112            }
113            Op::Set | Op::Add | Op::Replace => {
114                if key_len == 0 || extras_len != 8 {
115                    Err("invalid storage")
116                } else {
117                    parse_flags_exptime(&extras, &mut req);
118                    req.value = Some(value);
119                    Ok(())
120                }
121            }
122            Op::Append | Op::Prepend => {
123                if key_len == 0 || (extras_len != 0 && extras_len != 8) {
124                    Err("invalid append")
125                } else {
126                    if extras_len == 8 {
127                        parse_flags_exptime(&extras, &mut req);
128                    }
129                    req.value = Some(value);
130                    Ok(())
131                }
132            }
133            Op::Delete => {
134                if key_len == 0 || extras_len != 0 || !value.is_empty() {
135                    Err("invalid delete")
136                } else {
137                    Ok(())
138                }
139            }
140            Op::Flush => {
141                if key_len != 0 || (extras_len != 0 && extras_len != 4) || !value.is_empty() {
142                    Err("invalid flush")
143                } else {
144                    if extras_len == 4 {
145                        req.exptime =
146                            Some(
147                                u32::from_be_bytes([extras[0], extras[1], extras[2], extras[3]])
148                                    as i64,
149                            );
150                    }
151                    Ok(())
152                }
153            }
154            Op::Incr | Op::Decr => {
155                if key_len == 0 || extras_len != 20 || !value.is_empty() {
156                    Err("invalid incr")
157                } else {
158                    parse_delta(&extras, &mut req);
159                    Ok(())
160                }
161            }
162            Op::Touch => {
163                if key_len == 0 || extras_len != 4 || !value.is_empty() {
164                    Err("invalid touch")
165                } else {
166                    req.exptime =
167                        Some(
168                            u32::from_be_bytes([extras[0], extras[1], extras[2], extras[3]]) as i64,
169                        );
170                    Ok(())
171                }
172            }
173            Op::Stats => {
174                if extras_len != 0 {
175                    Err("invalid stats")
176                } else {
177                    if key_len > 0 {
178                        req.key = Some(key);
179                    }
180                    Ok(())
181                }
182            }
183            Op::SaslListMechs => {
184                if extras_len != 0 || key_len != 0 || !value.is_empty() {
185                    Err("invalid sasl list")
186                } else {
187                    Ok(())
188                }
189            }
190            Op::SaslAuth | Op::SaslStep => {
191                if extras_len != 0 || key_len == 0 || value.is_empty() {
192                    Err("invalid sasl auth")
193                } else {
194                    req.value = Some(value);
195                    Ok(())
196                }
197            }
198            Op::Version | Op::Noop | Op::Quit => {
199                if extras_len != 0 || key_len != 0 || !value.is_empty() {
200                    Err("invalid command")
201                } else {
202                    Ok(())
203                }
204            }
205            Op::Gets | Op::Gat | Op::Gats | Op::Cas => Ok(()),
206            Op::Unknown
207            | Op::MetaGet
208            | Op::MetaSet
209            | Op::MetaDelete
210            | Op::MetaArithmetic
211            | Op::MetaDebug
212            | Op::MetaNoop => Ok(()),
213        };
214
215        if result.is_err() {
216            let err = Error::client("invalid arguments");
217            return Some(DecodeOutcome::Response(meta, Response::Error(err)));
218        }
219
220        Some(DecodeOutcome::Request(req, meta))
221    }
222}
223
224fn parse_flags_exptime(extras: &Bytes, req: &mut Request) {
225    if extras.len() < 8 {
226        return;
227    }
228    req.flags = Some(u32::from_be_bytes([
229        extras[0], extras[1], extras[2], extras[3],
230    ]));
231    req.exptime = Some(u32::from_be_bytes([extras[4], extras[5], extras[6], extras[7]]) as i64);
232}
233
234fn parse_delta(extras: &Bytes, req: &mut Request) {
235    if extras.len() < 20 {
236        return;
237    }
238    req.delta = Some(u64::from_be_bytes([
239        extras[0], extras[1], extras[2], extras[3], extras[4], extras[5], extras[6], extras[7],
240    ]));
241    req.initial = Some(u64::from_be_bytes([
242        extras[8], extras[9], extras[10], extras[11], extras[12], extras[13], extras[14],
243        extras[15],
244    ]));
245    req.exptime = Some(u32::from_be_bytes([extras[16], extras[17], extras[18], extras[19]]) as i64);
246}
247
248fn opcode_to_op(opcode: u8) -> (Op, bool, bool) {
249    match opcode {
250        0x00 => (Op::Get, false, false),
251        0x09 => (Op::Get, true, false),
252        0x0c => (Op::Get, false, true),
253        0x0d => (Op::Get, true, true),
254        0x01 => (Op::Set, false, false),
255        0x11 => (Op::Set, true, false),
256        0x02 => (Op::Add, false, false),
257        0x12 => (Op::Add, true, false),
258        0x03 => (Op::Replace, false, false),
259        0x13 => (Op::Replace, true, false),
260        0x04 => (Op::Delete, false, false),
261        0x14 => (Op::Delete, true, false),
262        0x05 => (Op::Incr, false, false),
263        0x15 => (Op::Incr, true, false),
264        0x06 => (Op::Decr, false, false),
265        0x16 => (Op::Decr, true, false),
266        0x07 => (Op::Quit, false, false),
267        0x17 => (Op::Quit, true, false),
268        0x0a => (Op::Noop, false, false),
269        0x0b => (Op::Version, false, false),
270        0x08 => (Op::Flush, false, false),
271        0x18 => (Op::Flush, true, false),
272        0x0e => (Op::Append, false, false),
273        0x19 => (Op::Append, true, false),
274        0x0f => (Op::Prepend, false, false),
275        0x1a => (Op::Prepend, true, false),
276        0x10 => (Op::Stats, false, false),
277        0x20 => (Op::SaslListMechs, false, false),
278        0x21 => (Op::SaslAuth, false, false),
279        0x22 => (Op::SaslStep, false, false),
280        _ => (Op::Unknown, false, false),
281    }
282}
283
284fn is_valid_key(key: &Bytes) -> bool {
285    if key.is_empty() || key.len() > 250 {
286        return false;
287    }
288    for &b in key.as_ref() {
289        if b <= b' ' || b == 0x7f {
290            return false;
291        }
292    }
293    true
294}
295
296pub const STATUS_SUCCESS: u16 = 0x0000;
297pub const STATUS_KEY_NOT_FOUND: u16 = 0x0001;
298pub const STATUS_KEY_EXISTS: u16 = 0x0002;
299pub const STATUS_ITEM_NOT_STORED: u16 = 0x0005;
300pub const STATUS_INVALID_ARGUMENTS: u16 = 0x0004;
301pub const STATUS_AUTH_ERROR: u16 = 0x0020;
302pub const STATUS_UNKNOWN_COMMAND: u16 = 0x0081;
303pub const STATUS_INTERNAL_ERROR: u16 = 0x0084;
304
305pub fn encode_binary_response(
306    meta: RequestMeta,
307    response: &Response,
308    out: &mut BytesMut,
309    return_key: bool,
310) -> (u16, usize) {
311    let opcode = meta.opcode;
312    let opaque = meta.opaque.unwrap_or(0);
313
314    match response {
315        Response::Stored
316        | Response::Deleted
317        | Response::Touched
318        | Response::Noop
319        | Response::Ok => {
320            encode_header(
321                out,
322                HeaderFields::new(opcode, 0, 0, STATUS_SUCCESS, 0, opaque, 0),
323            );
324            (STATUS_SUCCESS, 24)
325        }
326        Response::NotStored => {
327            encode_header(
328                out,
329                HeaderFields::new(opcode, 0, 0, STATUS_ITEM_NOT_STORED, 0, opaque, 0),
330            );
331            (STATUS_ITEM_NOT_STORED, 24)
332        }
333        Response::Exists => {
334            encode_header(
335                out,
336                HeaderFields::new(opcode, 0, 0, STATUS_KEY_EXISTS, 0, opaque, 0),
337            );
338            (STATUS_KEY_EXISTS, 24)
339        }
340        Response::NotFound => {
341            encode_header(
342                out,
343                HeaderFields::new(opcode, 0, 0, STATUS_KEY_NOT_FOUND, 0, opaque, 0),
344            );
345            (STATUS_KEY_NOT_FOUND, 24)
346        }
347        Response::Numeric(value) => {
348            let extras_len = 0u8;
349            let key_len = 0u16;
350            let body_len = 8u32;
351            encode_header(
352                out,
353                HeaderFields::new(
354                    opcode,
355                    extras_len,
356                    key_len,
357                    STATUS_SUCCESS,
358                    body_len,
359                    opaque,
360                    0,
361                ),
362            );
363            out.extend_from_slice(&value.to_be_bytes());
364            (STATUS_SUCCESS, 24 + 8)
365        }
366        Response::Value(entry) => {
367            let extras_len = 4u8;
368            let key = if return_key { entry.key.as_ref() } else { b"" };
369            let key_len = key.len() as u16;
370            let body_len = extras_len as u32 + key_len as u32 + entry.value.len() as u32;
371            encode_header(
372                out,
373                HeaderFields::new(
374                    opcode,
375                    extras_len,
376                    key_len,
377                    STATUS_SUCCESS,
378                    body_len,
379                    opaque,
380                    entry.cas.unwrap_or(0),
381                ),
382            );
383            out.extend_from_slice(&entry.flags.to_be_bytes());
384            if return_key {
385                out.extend_from_slice(key);
386            }
387            out.extend_from_slice(entry.value.as_ref());
388            (STATUS_SUCCESS, 24 + body_len as usize)
389        }
390        Response::Values(entries) => {
391            if let Some(entry) = entries.first() {
392                encode_binary_response(meta, &Response::Value(entry.clone()), out, return_key)
393            } else {
394                encode_header(
395                    out,
396                    HeaderFields::new(opcode, 0, 0, STATUS_KEY_NOT_FOUND, 0, opaque, 0),
397                );
398                (STATUS_KEY_NOT_FOUND, 24)
399            }
400        }
401        Response::Stats(lines) => {
402            let mut total = 0usize;
403            for line in lines {
404                total += encode_stat_line(meta, line, out);
405            }
406            total += encode_header(
407                out,
408                HeaderFields::new(opcode, 0, 0, STATUS_SUCCESS, 0, opaque, 0),
409            );
410            (STATUS_SUCCESS, total)
411        }
412        Response::Version(version) => {
413            let body_len = version.len() as u32;
414            encode_header(
415                out,
416                HeaderFields::new(opcode, 0, 0, STATUS_SUCCESS, body_len, opaque, 0),
417            );
418            out.extend_from_slice(version.as_ref());
419            (STATUS_SUCCESS, 24 + version.len())
420        }
421        Response::Error(err) => {
422            let status = match err.kind {
423                crate::error::ErrorKind::UnknownCommand => STATUS_UNKNOWN_COMMAND,
424                crate::error::ErrorKind::Client => STATUS_INVALID_ARGUMENTS,
425                crate::error::ErrorKind::Server => STATUS_INTERNAL_ERROR,
426                crate::error::ErrorKind::Auth => STATUS_AUTH_ERROR,
427            };
428            let body_len = err.message.len() as u32;
429            encode_header(
430                out,
431                HeaderFields::new(opcode, 0, 0, status, body_len, opaque, 0),
432            );
433            out.extend_from_slice(err.message.as_ref());
434            (status, 24 + err.message.len())
435        }
436        Response::Meta(_) | Response::ValuesStream(_) | Response::StatsStream(_) => {
437            encode_header(
438                out,
439                HeaderFields::new(opcode, 0, 0, STATUS_INTERNAL_ERROR, 0, opaque, 0),
440            );
441            (STATUS_INTERNAL_ERROR, 24)
442        }
443    }
444}
445
446struct HeaderFields {
447    opcode: u8,
448    extras_len: u8,
449    key_len: u16,
450    status: u16,
451    body_len: u32,
452    opaque: u32,
453    cas: u64,
454}
455
456impl HeaderFields {
457    fn new(
458        opcode: u8,
459        extras_len: u8,
460        key_len: u16,
461        status: u16,
462        body_len: u32,
463        opaque: u32,
464        cas: u64,
465    ) -> Self {
466        Self {
467            opcode,
468            extras_len,
469            key_len,
470            status,
471            body_len,
472            opaque,
473            cas,
474        }
475    }
476}
477
478fn encode_header(out: &mut BytesMut, header: HeaderFields) -> usize {
479    let HeaderFields {
480        opcode,
481        extras_len,
482        key_len,
483        status,
484        body_len,
485        opaque,
486        cas,
487    } = header;
488    out.extend_from_slice(&[
489        0x81,
490        opcode,
491        (key_len >> 8) as u8,
492        (key_len & 0xff) as u8,
493        extras_len,
494        0x00,
495        (status >> 8) as u8,
496        (status & 0xff) as u8,
497        ((body_len >> 24) & 0xff) as u8,
498        ((body_len >> 16) & 0xff) as u8,
499        ((body_len >> 8) & 0xff) as u8,
500        (body_len & 0xff) as u8,
501        ((opaque >> 24) & 0xff) as u8,
502        ((opaque >> 16) & 0xff) as u8,
503        ((opaque >> 8) & 0xff) as u8,
504        (opaque & 0xff) as u8,
505        ((cas >> 56) & 0xff) as u8,
506        ((cas >> 48) & 0xff) as u8,
507        ((cas >> 40) & 0xff) as u8,
508        ((cas >> 32) & 0xff) as u8,
509        ((cas >> 24) & 0xff) as u8,
510        ((cas >> 16) & 0xff) as u8,
511        ((cas >> 8) & 0xff) as u8,
512        (cas & 0xff) as u8,
513    ]);
514    24
515}
516
517fn encode_stat_line(meta: RequestMeta, line: &StatLine, out: &mut BytesMut) -> usize {
518    let opcode = meta.opcode;
519    let opaque = meta.opaque.unwrap_or(0);
520    let key_len = line.key.len() as u16;
521    let body_len = key_len as u32 + line.value.len() as u32;
522    encode_header(
523        out,
524        HeaderFields::new(opcode, 0, key_len, STATUS_SUCCESS, body_len, opaque, 0),
525    );
526    out.extend_from_slice(line.key.as_ref());
527    out.extend_from_slice(line.value.as_ref());
528    24 + body_len as usize
529}
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534    use bytes::BytesMut;
535
536    #[test]
537    fn decode_get_request() {
538        let key = b"foo";
539        let mut buf = BytesMut::with_capacity(24 + key.len());
540        buf.extend_from_slice(&[
541            0x80,
542            0x00, // magic, opcode
543            0x00,
544            key.len() as u8, // key length
545            0x00,            // extras length
546            0x00,            // data type
547            0x00,
548            0x00, // reserved
549            0x00,
550            0x00,
551            0x00,
552            key.len() as u8, // body length
553            0xde,
554            0xad,
555            0xbe,
556            0xef, // opaque
557            0x00,
558            0x00,
559            0x00,
560            0x00,
561            0x00,
562            0x00,
563            0x00,
564            0x00, // cas
565        ]);
566        buf.extend_from_slice(key);
567        let mut decoder = BinaryDecoder::new();
568        let outcome = decoder.decode(&mut buf, BinaryLimits::default());
569        let (req, meta) = match outcome {
570            Some(DecodeOutcome::Request(req, meta)) => (req, meta),
571            _ => panic!("unexpected outcome"),
572        };
573        assert_eq!(req.op, Op::Get);
574        assert_eq!(req.key.unwrap(), Bytes::from_static(b"foo"));
575        assert_eq!(meta.protocol, Protocol::Binary);
576    }
577
578    #[test]
579    fn decode_invalid_magic() {
580        let mut buf = BytesMut::with_capacity(24);
581        buf.extend_from_slice(&[
582            0x81, 0x00, // magic, opcode
583            0x00, 0x00, // key length
584            0x00, // extras length
585            0x00, // data type
586            0x00, 0x00, // reserved
587            0x00, 0x00, 0x00, 0x00, // body length
588            0x00, 0x00, 0x00, 0x00, // opaque
589            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // cas
590        ]);
591        let mut decoder = BinaryDecoder::new();
592        let outcome = decoder.decode(&mut buf, BinaryLimits::default());
593        match outcome {
594            Some(DecodeOutcome::Response(_, Response::Error(_))) => {}
595            _ => panic!("unexpected outcome"),
596        }
597    }
598
599    #[test]
600    fn decode_flush_with_exptime() {
601        let mut buf = BytesMut::with_capacity(28);
602        buf.extend_from_slice(&[
603            0x80, 0x08, // magic, opcode (flush)
604            0x00, 0x00, // key length
605            0x04, // extras length
606            0x00, // data type
607            0x00, 0x00, // reserved
608            0x00, 0x00, 0x00, 0x04, // body length
609            0x00, 0x00, 0x00, 0x00, // opaque
610            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // cas
611        ]);
612        buf.extend_from_slice(&[0x00, 0x00, 0x00, 0x0a]);
613        let mut decoder = BinaryDecoder::new();
614        let outcome = decoder.decode(&mut buf, BinaryLimits::default());
615        let (req, _meta) = match outcome {
616            Some(DecodeOutcome::Request(req, meta)) => (req, meta),
617            _ => panic!("unexpected outcome"),
618        };
619        assert_eq!(req.op, Op::Flush);
620        assert_eq!(req.exptime, Some(10));
621    }
622
623    #[test]
624    fn decode_sasl_auth() {
625        let mechanism = b"PLAIN";
626        let payload = b"\0user\0pass";
627        let body_len = mechanism.len() + payload.len();
628        let mut buf = BytesMut::with_capacity(24 + body_len);
629        buf.extend_from_slice(&[
630            0x80,
631            0x21, // magic, opcode (sasl auth)
632            0x00,
633            mechanism.len() as u8, // key length
634            0x00,                  // extras length
635            0x00,                  // data type
636            0x00,
637            0x00, // reserved
638            0x00,
639            0x00,
640            0x00,
641            body_len as u8, // body length
642            0x00,
643            0x00,
644            0x00,
645            0x00, // opaque
646            0x00,
647            0x00,
648            0x00,
649            0x00,
650            0x00,
651            0x00,
652            0x00,
653            0x00, // cas
654        ]);
655        buf.extend_from_slice(mechanism);
656        buf.extend_from_slice(payload);
657        let mut decoder = BinaryDecoder::new();
658        let outcome = decoder.decode(&mut buf, BinaryLimits::default());
659        let (req, _meta) = match outcome {
660            Some(DecodeOutcome::Request(req, meta)) => (req, meta),
661            _ => panic!("unexpected outcome"),
662        };
663        assert_eq!(req.op, Op::SaslAuth);
664        assert_eq!(req.key.unwrap(), Bytes::from_static(b"PLAIN"));
665        assert_eq!(req.value.unwrap(), Bytes::from_static(b"\0user\0pass"));
666    }
667}