1use bytes::{Bytes, BytesMut};
2
3use crate::codec::DecodeOutcome;
4use crate::error::Error;
5use crate::response::Response;
6use crate::types::{Op, Protocol, ReplyMode, Request, RequestMeta, StatLine};
7
8#[derive(Debug, Clone, Copy)]
10pub struct BinaryLimits {
11 pub max_frame_len: usize,
12}
13
14impl Default for BinaryLimits {
15 fn default() -> Self {
16 Self {
17 max_frame_len: 1 << 21,
18 }
19 }
20}
21
22#[derive(Debug, Clone, Default)]
24pub struct BinaryDecoder;
25
26impl BinaryDecoder {
27 pub fn new() -> Self {
28 Self
29 }
30
31 pub fn decode(&mut self, buf: &mut BytesMut, limits: BinaryLimits) -> Option<DecodeOutcome> {
32 if buf.len() < 24 {
33 return None;
34 }
35 let header = &buf[..24];
36 let magic = header[0];
37 let opcode = header[1];
38 let key_len = u16::from_be_bytes([header[2], header[3]]) as usize;
39 let extras_len = header[4] as usize;
40 let body_len = u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
41 let opaque = u32::from_be_bytes([header[12], header[13], header[14], header[15]]);
42 let cas = u64::from_be_bytes([
43 header[16], header[17], header[18], header[19], header[20], header[21], header[22],
44 header[23],
45 ]);
46
47 let mut meta = RequestMeta {
48 protocol: Protocol::Binary,
49 reply: ReplyMode::Always,
50 opaque: Some(opaque),
51 return_key: false,
52 opcode,
53 };
54
55 if magic != 0x80 {
56 let err = Error::server("invalid magic");
57 return Some(DecodeOutcome::Response(meta, Response::Error(err)));
58 }
59 if body_len > limits.max_frame_len {
60 let err = Error::server("frame too large");
61 return Some(DecodeOutcome::Response(meta, Response::Error(err)));
62 }
63 if buf.len() < 24 + body_len {
64 return None;
65 }
66
67 let frame = buf.split_to(24 + body_len).freeze();
68 let body = frame.slice(24..);
69
70 if extras_len + key_len > body_len {
71 let err = Error::client("invalid lengths");
72 return Some(DecodeOutcome::Response(meta, Response::Error(err)));
73 }
74
75 let extras = body.slice(0..extras_len);
76 let key = body.slice(extras_len..extras_len + key_len);
77 let value = body.slice(extras_len + key_len..body_len);
78
79 let (op, quiet, return_key) = opcode_to_op(opcode);
80 meta.reply = if quiet {
81 ReplyMode::QuietBuffered
82 } else {
83 ReplyMode::Always
84 };
85 meta.return_key = return_key;
86
87 let mut req = Request::new(op);
88
89 if key_len > 0 {
90 if key_len > 250 {
91 let err = Error::client("key too long");
92 return Some(DecodeOutcome::Response(meta, Response::Error(err)));
93 }
94 if !is_valid_key(&key) {
95 let err = Error::client("invalid key");
96 return Some(DecodeOutcome::Response(meta, Response::Error(err)));
97 }
98 req.key = Some(key.clone());
99 }
100
101 if cas != 0 {
102 req.cas = Some(cas);
103 }
104
105 let result = match op {
106 Op::Get => {
107 if key_len == 0 || extras_len != 0 || !value.is_empty() {
108 Err("invalid get")
109 } else {
110 Ok(())
111 }
112 }
113 Op::Set | Op::Add | Op::Replace => {
114 if key_len == 0 || extras_len != 8 {
115 Err("invalid storage")
116 } else {
117 parse_flags_exptime(&extras, &mut req);
118 req.value = Some(value);
119 Ok(())
120 }
121 }
122 Op::Append | Op::Prepend => {
123 if key_len == 0 || (extras_len != 0 && extras_len != 8) {
124 Err("invalid append")
125 } else {
126 if extras_len == 8 {
127 parse_flags_exptime(&extras, &mut req);
128 }
129 req.value = Some(value);
130 Ok(())
131 }
132 }
133 Op::Delete => {
134 if key_len == 0 || extras_len != 0 || !value.is_empty() {
135 Err("invalid delete")
136 } else {
137 Ok(())
138 }
139 }
140 Op::Flush => {
141 if key_len != 0 || (extras_len != 0 && extras_len != 4) || !value.is_empty() {
142 Err("invalid flush")
143 } else {
144 if extras_len == 4 {
145 req.exptime =
146 Some(
147 u32::from_be_bytes([extras[0], extras[1], extras[2], extras[3]])
148 as i64,
149 );
150 }
151 Ok(())
152 }
153 }
154 Op::Incr | Op::Decr => {
155 if key_len == 0 || extras_len != 20 || !value.is_empty() {
156 Err("invalid incr")
157 } else {
158 parse_delta(&extras, &mut req);
159 Ok(())
160 }
161 }
162 Op::Touch => {
163 if key_len == 0 || extras_len != 4 || !value.is_empty() {
164 Err("invalid touch")
165 } else {
166 req.exptime =
167 Some(
168 u32::from_be_bytes([extras[0], extras[1], extras[2], extras[3]]) as i64,
169 );
170 Ok(())
171 }
172 }
173 Op::Stats => {
174 if extras_len != 0 {
175 Err("invalid stats")
176 } else {
177 if key_len > 0 {
178 req.key = Some(key);
179 }
180 Ok(())
181 }
182 }
183 Op::SaslListMechs => {
184 if extras_len != 0 || key_len != 0 || !value.is_empty() {
185 Err("invalid sasl list")
186 } else {
187 Ok(())
188 }
189 }
190 Op::SaslAuth | Op::SaslStep => {
191 if extras_len != 0 || key_len == 0 || value.is_empty() {
192 Err("invalid sasl auth")
193 } else {
194 req.value = Some(value);
195 Ok(())
196 }
197 }
198 Op::Version | Op::Noop | Op::Quit => {
199 if extras_len != 0 || key_len != 0 || !value.is_empty() {
200 Err("invalid command")
201 } else {
202 Ok(())
203 }
204 }
205 Op::Gets | Op::Gat | Op::Gats | Op::Cas => Ok(()),
206 Op::Unknown
207 | Op::MetaGet
208 | Op::MetaSet
209 | Op::MetaDelete
210 | Op::MetaArithmetic
211 | Op::MetaDebug
212 | Op::MetaNoop => Ok(()),
213 };
214
215 if result.is_err() {
216 let err = Error::client("invalid arguments");
217 return Some(DecodeOutcome::Response(meta, Response::Error(err)));
218 }
219
220 Some(DecodeOutcome::Request(req, meta))
221 }
222}
223
224fn parse_flags_exptime(extras: &Bytes, req: &mut Request) {
225 if extras.len() < 8 {
226 return;
227 }
228 req.flags = Some(u32::from_be_bytes([
229 extras[0], extras[1], extras[2], extras[3],
230 ]));
231 req.exptime = Some(u32::from_be_bytes([extras[4], extras[5], extras[6], extras[7]]) as i64);
232}
233
234fn parse_delta(extras: &Bytes, req: &mut Request) {
235 if extras.len() < 20 {
236 return;
237 }
238 req.delta = Some(u64::from_be_bytes([
239 extras[0], extras[1], extras[2], extras[3], extras[4], extras[5], extras[6], extras[7],
240 ]));
241 req.initial = Some(u64::from_be_bytes([
242 extras[8], extras[9], extras[10], extras[11], extras[12], extras[13], extras[14],
243 extras[15],
244 ]));
245 req.exptime = Some(u32::from_be_bytes([extras[16], extras[17], extras[18], extras[19]]) as i64);
246}
247
248fn opcode_to_op(opcode: u8) -> (Op, bool, bool) {
249 match opcode {
250 0x00 => (Op::Get, false, false),
251 0x09 => (Op::Get, true, false),
252 0x0c => (Op::Get, false, true),
253 0x0d => (Op::Get, true, true),
254 0x01 => (Op::Set, false, false),
255 0x11 => (Op::Set, true, false),
256 0x02 => (Op::Add, false, false),
257 0x12 => (Op::Add, true, false),
258 0x03 => (Op::Replace, false, false),
259 0x13 => (Op::Replace, true, false),
260 0x04 => (Op::Delete, false, false),
261 0x14 => (Op::Delete, true, false),
262 0x05 => (Op::Incr, false, false),
263 0x15 => (Op::Incr, true, false),
264 0x06 => (Op::Decr, false, false),
265 0x16 => (Op::Decr, true, false),
266 0x07 => (Op::Quit, false, false),
267 0x17 => (Op::Quit, true, false),
268 0x0a => (Op::Noop, false, false),
269 0x0b => (Op::Version, false, false),
270 0x08 => (Op::Flush, false, false),
271 0x18 => (Op::Flush, true, false),
272 0x0e => (Op::Append, false, false),
273 0x19 => (Op::Append, true, false),
274 0x0f => (Op::Prepend, false, false),
275 0x1a => (Op::Prepend, true, false),
276 0x10 => (Op::Stats, false, false),
277 0x20 => (Op::SaslListMechs, false, false),
278 0x21 => (Op::SaslAuth, false, false),
279 0x22 => (Op::SaslStep, false, false),
280 _ => (Op::Unknown, false, false),
281 }
282}
283
284fn is_valid_key(key: &Bytes) -> bool {
285 if key.is_empty() || key.len() > 250 {
286 return false;
287 }
288 for &b in key.as_ref() {
289 if b <= b' ' || b == 0x7f {
290 return false;
291 }
292 }
293 true
294}
295
296pub const STATUS_SUCCESS: u16 = 0x0000;
297pub const STATUS_KEY_NOT_FOUND: u16 = 0x0001;
298pub const STATUS_KEY_EXISTS: u16 = 0x0002;
299pub const STATUS_ITEM_NOT_STORED: u16 = 0x0005;
300pub const STATUS_INVALID_ARGUMENTS: u16 = 0x0004;
301pub const STATUS_AUTH_ERROR: u16 = 0x0020;
302pub const STATUS_UNKNOWN_COMMAND: u16 = 0x0081;
303pub const STATUS_INTERNAL_ERROR: u16 = 0x0084;
304
305pub fn encode_binary_response(
306 meta: RequestMeta,
307 response: &Response,
308 out: &mut BytesMut,
309 return_key: bool,
310) -> (u16, usize) {
311 let opcode = meta.opcode;
312 let opaque = meta.opaque.unwrap_or(0);
313
314 match response {
315 Response::Stored
316 | Response::Deleted
317 | Response::Touched
318 | Response::Noop
319 | Response::Ok => {
320 encode_header(
321 out,
322 HeaderFields::new(opcode, 0, 0, STATUS_SUCCESS, 0, opaque, 0),
323 );
324 (STATUS_SUCCESS, 24)
325 }
326 Response::NotStored => {
327 encode_header(
328 out,
329 HeaderFields::new(opcode, 0, 0, STATUS_ITEM_NOT_STORED, 0, opaque, 0),
330 );
331 (STATUS_ITEM_NOT_STORED, 24)
332 }
333 Response::Exists => {
334 encode_header(
335 out,
336 HeaderFields::new(opcode, 0, 0, STATUS_KEY_EXISTS, 0, opaque, 0),
337 );
338 (STATUS_KEY_EXISTS, 24)
339 }
340 Response::NotFound => {
341 encode_header(
342 out,
343 HeaderFields::new(opcode, 0, 0, STATUS_KEY_NOT_FOUND, 0, opaque, 0),
344 );
345 (STATUS_KEY_NOT_FOUND, 24)
346 }
347 Response::Numeric(value) => {
348 let extras_len = 0u8;
349 let key_len = 0u16;
350 let body_len = 8u32;
351 encode_header(
352 out,
353 HeaderFields::new(
354 opcode,
355 extras_len,
356 key_len,
357 STATUS_SUCCESS,
358 body_len,
359 opaque,
360 0,
361 ),
362 );
363 out.extend_from_slice(&value.to_be_bytes());
364 (STATUS_SUCCESS, 24 + 8)
365 }
366 Response::Value(entry) => {
367 let extras_len = 4u8;
368 let key = if return_key { entry.key.as_ref() } else { b"" };
369 let key_len = key.len() as u16;
370 let body_len = extras_len as u32 + key_len as u32 + entry.value.len() as u32;
371 encode_header(
372 out,
373 HeaderFields::new(
374 opcode,
375 extras_len,
376 key_len,
377 STATUS_SUCCESS,
378 body_len,
379 opaque,
380 entry.cas.unwrap_or(0),
381 ),
382 );
383 out.extend_from_slice(&entry.flags.to_be_bytes());
384 if return_key {
385 out.extend_from_slice(key);
386 }
387 out.extend_from_slice(entry.value.as_ref());
388 (STATUS_SUCCESS, 24 + body_len as usize)
389 }
390 Response::Values(entries) => {
391 if let Some(entry) = entries.first() {
392 encode_binary_response(meta, &Response::Value(entry.clone()), out, return_key)
393 } else {
394 encode_header(
395 out,
396 HeaderFields::new(opcode, 0, 0, STATUS_KEY_NOT_FOUND, 0, opaque, 0),
397 );
398 (STATUS_KEY_NOT_FOUND, 24)
399 }
400 }
401 Response::Stats(lines) => {
402 let mut total = 0usize;
403 for line in lines {
404 total += encode_stat_line(meta, line, out);
405 }
406 total += encode_header(
407 out,
408 HeaderFields::new(opcode, 0, 0, STATUS_SUCCESS, 0, opaque, 0),
409 );
410 (STATUS_SUCCESS, total)
411 }
412 Response::Version(version) => {
413 let body_len = version.len() as u32;
414 encode_header(
415 out,
416 HeaderFields::new(opcode, 0, 0, STATUS_SUCCESS, body_len, opaque, 0),
417 );
418 out.extend_from_slice(version.as_ref());
419 (STATUS_SUCCESS, 24 + version.len())
420 }
421 Response::Error(err) => {
422 let status = match err.kind {
423 crate::error::ErrorKind::UnknownCommand => STATUS_UNKNOWN_COMMAND,
424 crate::error::ErrorKind::Client => STATUS_INVALID_ARGUMENTS,
425 crate::error::ErrorKind::Server => STATUS_INTERNAL_ERROR,
426 crate::error::ErrorKind::Auth => STATUS_AUTH_ERROR,
427 };
428 let body_len = err.message.len() as u32;
429 encode_header(
430 out,
431 HeaderFields::new(opcode, 0, 0, status, body_len, opaque, 0),
432 );
433 out.extend_from_slice(err.message.as_ref());
434 (status, 24 + err.message.len())
435 }
436 Response::Meta(_) | Response::ValuesStream(_) | Response::StatsStream(_) => {
437 encode_header(
438 out,
439 HeaderFields::new(opcode, 0, 0, STATUS_INTERNAL_ERROR, 0, opaque, 0),
440 );
441 (STATUS_INTERNAL_ERROR, 24)
442 }
443 }
444}
445
446struct HeaderFields {
447 opcode: u8,
448 extras_len: u8,
449 key_len: u16,
450 status: u16,
451 body_len: u32,
452 opaque: u32,
453 cas: u64,
454}
455
456impl HeaderFields {
457 fn new(
458 opcode: u8,
459 extras_len: u8,
460 key_len: u16,
461 status: u16,
462 body_len: u32,
463 opaque: u32,
464 cas: u64,
465 ) -> Self {
466 Self {
467 opcode,
468 extras_len,
469 key_len,
470 status,
471 body_len,
472 opaque,
473 cas,
474 }
475 }
476}
477
478fn encode_header(out: &mut BytesMut, header: HeaderFields) -> usize {
479 let HeaderFields {
480 opcode,
481 extras_len,
482 key_len,
483 status,
484 body_len,
485 opaque,
486 cas,
487 } = header;
488 out.extend_from_slice(&[
489 0x81,
490 opcode,
491 (key_len >> 8) as u8,
492 (key_len & 0xff) as u8,
493 extras_len,
494 0x00,
495 (status >> 8) as u8,
496 (status & 0xff) as u8,
497 ((body_len >> 24) & 0xff) as u8,
498 ((body_len >> 16) & 0xff) as u8,
499 ((body_len >> 8) & 0xff) as u8,
500 (body_len & 0xff) as u8,
501 ((opaque >> 24) & 0xff) as u8,
502 ((opaque >> 16) & 0xff) as u8,
503 ((opaque >> 8) & 0xff) as u8,
504 (opaque & 0xff) as u8,
505 ((cas >> 56) & 0xff) as u8,
506 ((cas >> 48) & 0xff) as u8,
507 ((cas >> 40) & 0xff) as u8,
508 ((cas >> 32) & 0xff) as u8,
509 ((cas >> 24) & 0xff) as u8,
510 ((cas >> 16) & 0xff) as u8,
511 ((cas >> 8) & 0xff) as u8,
512 (cas & 0xff) as u8,
513 ]);
514 24
515}
516
517fn encode_stat_line(meta: RequestMeta, line: &StatLine, out: &mut BytesMut) -> usize {
518 let opcode = meta.opcode;
519 let opaque = meta.opaque.unwrap_or(0);
520 let key_len = line.key.len() as u16;
521 let body_len = key_len as u32 + line.value.len() as u32;
522 encode_header(
523 out,
524 HeaderFields::new(opcode, 0, key_len, STATUS_SUCCESS, body_len, opaque, 0),
525 );
526 out.extend_from_slice(line.key.as_ref());
527 out.extend_from_slice(line.value.as_ref());
528 24 + body_len as usize
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534 use bytes::BytesMut;
535
536 #[test]
537 fn decode_get_request() {
538 let key = b"foo";
539 let mut buf = BytesMut::with_capacity(24 + key.len());
540 buf.extend_from_slice(&[
541 0x80,
542 0x00, 0x00,
544 key.len() as u8, 0x00, 0x00, 0x00,
548 0x00, 0x00,
550 0x00,
551 0x00,
552 key.len() as u8, 0xde,
554 0xad,
555 0xbe,
556 0xef, 0x00,
558 0x00,
559 0x00,
560 0x00,
561 0x00,
562 0x00,
563 0x00,
564 0x00, ]);
566 buf.extend_from_slice(key);
567 let mut decoder = BinaryDecoder::new();
568 let outcome = decoder.decode(&mut buf, BinaryLimits::default());
569 let (req, meta) = match outcome {
570 Some(DecodeOutcome::Request(req, meta)) => (req, meta),
571 _ => panic!("unexpected outcome"),
572 };
573 assert_eq!(req.op, Op::Get);
574 assert_eq!(req.key.unwrap(), Bytes::from_static(b"foo"));
575 assert_eq!(meta.protocol, Protocol::Binary);
576 }
577
578 #[test]
579 fn decode_invalid_magic() {
580 let mut buf = BytesMut::with_capacity(24);
581 buf.extend_from_slice(&[
582 0x81, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ]);
591 let mut decoder = BinaryDecoder::new();
592 let outcome = decoder.decode(&mut buf, BinaryLimits::default());
593 match outcome {
594 Some(DecodeOutcome::Response(_, Response::Error(_))) => {}
595 _ => panic!("unexpected outcome"),
596 }
597 }
598
599 #[test]
600 fn decode_flush_with_exptime() {
601 let mut buf = BytesMut::with_capacity(28);
602 buf.extend_from_slice(&[
603 0x80, 0x08, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ]);
612 buf.extend_from_slice(&[0x00, 0x00, 0x00, 0x0a]);
613 let mut decoder = BinaryDecoder::new();
614 let outcome = decoder.decode(&mut buf, BinaryLimits::default());
615 let (req, _meta) = match outcome {
616 Some(DecodeOutcome::Request(req, meta)) => (req, meta),
617 _ => panic!("unexpected outcome"),
618 };
619 assert_eq!(req.op, Op::Flush);
620 assert_eq!(req.exptime, Some(10));
621 }
622
623 #[test]
624 fn decode_sasl_auth() {
625 let mechanism = b"PLAIN";
626 let payload = b"\0user\0pass";
627 let body_len = mechanism.len() + payload.len();
628 let mut buf = BytesMut::with_capacity(24 + body_len);
629 buf.extend_from_slice(&[
630 0x80,
631 0x21, 0x00,
633 mechanism.len() as u8, 0x00, 0x00, 0x00,
637 0x00, 0x00,
639 0x00,
640 0x00,
641 body_len as u8, 0x00,
643 0x00,
644 0x00,
645 0x00, 0x00,
647 0x00,
648 0x00,
649 0x00,
650 0x00,
651 0x00,
652 0x00,
653 0x00, ]);
655 buf.extend_from_slice(mechanism);
656 buf.extend_from_slice(payload);
657 let mut decoder = BinaryDecoder::new();
658 let outcome = decoder.decode(&mut buf, BinaryLimits::default());
659 let (req, _meta) = match outcome {
660 Some(DecodeOutcome::Request(req, meta)) => (req, meta),
661 _ => panic!("unexpected outcome"),
662 };
663 assert_eq!(req.op, Op::SaslAuth);
664 assert_eq!(req.key.unwrap(), Bytes::from_static(b"PLAIN"));
665 assert_eq!(req.value.unwrap(), Bytes::from_static(b"\0user\0pass"));
666 }
667}