1use bytes::{Bytes, BytesMut};
2
3use crate::codec::DecodeOutcome;
4use crate::error::Error;
5use crate::response::Response;
6use crate::types::{Op, Protocol, ReplyMode, Request, RequestMeta, StatLine};
7
8#[derive(Debug, Clone, Copy)]
10pub struct BinaryLimits {
11 pub max_frame_len: usize,
12}
13
14impl Default for BinaryLimits {
15 fn default() -> Self {
16 Self {
17 max_frame_len: 1 << 21,
18 }
19 }
20}
21
22#[derive(Debug, Clone, Default)]
24pub struct BinaryDecoder;
25
26impl BinaryDecoder {
27 pub fn new() -> Self {
28 Self
29 }
30
31 pub fn decode(&mut self, buf: &mut BytesMut, limits: BinaryLimits) -> Option<DecodeOutcome> {
32 if buf.len() < 24 {
33 return None;
34 }
35 let header = &buf[..24];
36 let magic = header[0];
37 let opcode = header[1];
38 let key_len = u16::from_be_bytes([header[2], header[3]]) as usize;
39 let extras_len = header[4] as usize;
40 let body_len = u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
41 let opaque = u32::from_be_bytes([header[12], header[13], header[14], header[15]]);
42 let cas = u64::from_be_bytes([
43 header[16], header[17], header[18], header[19], header[20], header[21], header[22],
44 header[23],
45 ]);
46
47 let mut meta = RequestMeta {
48 protocol: Protocol::Binary,
49 reply: ReplyMode::Always,
50 opaque: Some(opaque),
51 return_key: false,
52 opcode,
53 };
54
55 if magic != 0x80 {
56 let err = Error::server("invalid magic");
57 return Some(DecodeOutcome::Response(meta, Response::Error(err)));
58 }
59 if body_len > limits.max_frame_len {
60 let err = Error::server("frame too large");
61 return Some(DecodeOutcome::Response(meta, Response::Error(err)));
62 }
63 if buf.len() < 24 + body_len {
64 return None;
65 }
66
67 let frame = buf.split_to(24 + body_len).freeze();
68 let body = frame.slice(24..);
69
70 if extras_len + key_len > body_len {
71 let err = Error::client("invalid lengths");
72 return Some(DecodeOutcome::Response(meta, Response::Error(err)));
73 }
74
75 let extras = body.slice(0..extras_len);
76 let key = body.slice(extras_len..extras_len + key_len);
77 let value = body.slice(extras_len + key_len..body_len);
78
79 let (op, quiet, return_key) = opcode_to_op(opcode);
80 meta.reply = if quiet {
81 ReplyMode::QuietBuffered
82 } else {
83 ReplyMode::Always
84 };
85 meta.return_key = return_key;
86
87 let mut req = Request::new(op);
88
89 if key_len > 0 {
90 if key_len > 250 {
91 let err = Error::client("key too long");
92 return Some(DecodeOutcome::Response(meta, Response::Error(err)));
93 }
94 if !is_valid_key(&key) {
95 let err = Error::client("invalid key");
96 return Some(DecodeOutcome::Response(meta, Response::Error(err)));
97 }
98 req.key = Some(key.clone());
99 }
100
101 if cas != 0 {
102 req.cas = Some(cas);
103 }
104
105 let result = match op {
106 Op::Get => {
107 if key_len == 0 || extras_len != 0 || !value.is_empty() {
108 Err("invalid get")
109 } else {
110 Ok(())
111 }
112 }
113 Op::Set | Op::Add | Op::Replace => {
114 if key_len == 0 || extras_len != 8 {
115 Err("invalid storage")
116 } else {
117 parse_flags_exptime(&extras, &mut req);
118 req.value = Some(value);
119 Ok(())
120 }
121 }
122 Op::Append | Op::Prepend => {
123 if key_len == 0 || (extras_len != 0 && extras_len != 8) {
124 Err("invalid append")
125 } else {
126 if extras_len == 8 {
127 parse_flags_exptime(&extras, &mut req);
128 }
129 req.value = Some(value);
130 Ok(())
131 }
132 }
133 Op::Delete => {
134 if key_len == 0 || extras_len != 0 || !value.is_empty() {
135 Err("invalid delete")
136 } else {
137 Ok(())
138 }
139 }
140 Op::Incr | Op::Decr => {
141 if key_len == 0 || extras_len != 20 || !value.is_empty() {
142 Err("invalid incr")
143 } else {
144 parse_delta(&extras, &mut req);
145 Ok(())
146 }
147 }
148 Op::Touch => {
149 if key_len == 0 || extras_len != 4 || !value.is_empty() {
150 Err("invalid touch")
151 } else {
152 req.exptime =
153 Some(
154 u32::from_be_bytes([extras[0], extras[1], extras[2], extras[3]]) as i64,
155 );
156 Ok(())
157 }
158 }
159 Op::Stats => {
160 if extras_len != 0 {
161 Err("invalid stats")
162 } else {
163 if key_len > 0 {
164 req.key = Some(key);
165 }
166 Ok(())
167 }
168 }
169 Op::Version | Op::Noop | Op::Quit => {
170 if extras_len != 0 || key_len != 0 || !value.is_empty() {
171 Err("invalid command")
172 } else {
173 Ok(())
174 }
175 }
176 Op::Gets | Op::Gat | Op::Gats | Op::Cas => Ok(()),
177 Op::Unknown
178 | Op::MetaGet
179 | Op::MetaSet
180 | Op::MetaDelete
181 | Op::MetaArithmetic
182 | Op::MetaDebug
183 | Op::MetaNoop => Ok(()),
184 };
185
186 if result.is_err() {
187 let err = Error::client("invalid arguments");
188 return Some(DecodeOutcome::Response(meta, Response::Error(err)));
189 }
190
191 Some(DecodeOutcome::Request(req, meta))
192 }
193}
194
195fn parse_flags_exptime(extras: &Bytes, req: &mut Request) {
196 if extras.len() < 8 {
197 return;
198 }
199 req.flags = Some(u32::from_be_bytes([
200 extras[0], extras[1], extras[2], extras[3],
201 ]));
202 req.exptime = Some(u32::from_be_bytes([extras[4], extras[5], extras[6], extras[7]]) as i64);
203}
204
205fn parse_delta(extras: &Bytes, req: &mut Request) {
206 if extras.len() < 20 {
207 return;
208 }
209 req.delta = Some(u64::from_be_bytes([
210 extras[0], extras[1], extras[2], extras[3], extras[4], extras[5], extras[6], extras[7],
211 ]));
212 req.initial = Some(u64::from_be_bytes([
213 extras[8], extras[9], extras[10], extras[11], extras[12], extras[13], extras[14],
214 extras[15],
215 ]));
216 req.exptime = Some(u32::from_be_bytes([extras[16], extras[17], extras[18], extras[19]]) as i64);
217}
218
219fn opcode_to_op(opcode: u8) -> (Op, bool, bool) {
220 match opcode {
221 0x00 => (Op::Get, false, false),
222 0x09 => (Op::Get, true, false),
223 0x0c => (Op::Get, false, true),
224 0x0d => (Op::Get, true, true),
225 0x01 => (Op::Set, false, false),
226 0x11 => (Op::Set, true, false),
227 0x02 => (Op::Add, false, false),
228 0x12 => (Op::Add, true, false),
229 0x03 => (Op::Replace, false, false),
230 0x13 => (Op::Replace, true, false),
231 0x04 => (Op::Delete, false, false),
232 0x14 => (Op::Delete, true, false),
233 0x05 => (Op::Incr, false, false),
234 0x15 => (Op::Incr, true, false),
235 0x06 => (Op::Decr, false, false),
236 0x16 => (Op::Decr, true, false),
237 0x07 => (Op::Quit, false, false),
238 0x17 => (Op::Quit, true, false),
239 0x0a => (Op::Noop, false, false),
240 0x0b => (Op::Version, false, false),
241 0x0e => (Op::Append, false, false),
242 0x19 => (Op::Append, true, false),
243 0x0f => (Op::Prepend, false, false),
244 0x1a => (Op::Prepend, true, false),
245 0x10 => (Op::Stats, false, false),
246 _ => (Op::Unknown, false, false),
247 }
248}
249
250fn is_valid_key(key: &Bytes) -> bool {
251 if key.is_empty() || key.len() > 250 {
252 return false;
253 }
254 for &b in key.as_ref() {
255 if b <= b' ' || b == 0x7f {
256 return false;
257 }
258 }
259 true
260}
261
262pub const STATUS_SUCCESS: u16 = 0x0000;
263pub const STATUS_KEY_NOT_FOUND: u16 = 0x0001;
264pub const STATUS_KEY_EXISTS: u16 = 0x0002;
265pub const STATUS_ITEM_NOT_STORED: u16 = 0x0005;
266pub const STATUS_INVALID_ARGUMENTS: u16 = 0x0004;
267pub const STATUS_UNKNOWN_COMMAND: u16 = 0x0081;
268pub const STATUS_INTERNAL_ERROR: u16 = 0x0084;
269
270pub fn encode_binary_response(
271 meta: RequestMeta,
272 response: &Response,
273 out: &mut BytesMut,
274 return_key: bool,
275) -> (u16, usize) {
276 let opcode = meta.opcode;
277 let opaque = meta.opaque.unwrap_or(0);
278
279 match response {
280 Response::Stored | Response::Deleted | Response::Touched | Response::Noop => {
281 encode_header(
282 out,
283 HeaderFields::new(opcode, 0, 0, STATUS_SUCCESS, 0, opaque, 0),
284 );
285 (STATUS_SUCCESS, 24)
286 }
287 Response::NotStored => {
288 encode_header(
289 out,
290 HeaderFields::new(opcode, 0, 0, STATUS_ITEM_NOT_STORED, 0, opaque, 0),
291 );
292 (STATUS_ITEM_NOT_STORED, 24)
293 }
294 Response::Exists => {
295 encode_header(
296 out,
297 HeaderFields::new(opcode, 0, 0, STATUS_KEY_EXISTS, 0, opaque, 0),
298 );
299 (STATUS_KEY_EXISTS, 24)
300 }
301 Response::NotFound => {
302 encode_header(
303 out,
304 HeaderFields::new(opcode, 0, 0, STATUS_KEY_NOT_FOUND, 0, opaque, 0),
305 );
306 (STATUS_KEY_NOT_FOUND, 24)
307 }
308 Response::Numeric(value) => {
309 let extras_len = 0u8;
310 let key_len = 0u16;
311 let body_len = 8u32;
312 encode_header(
313 out,
314 HeaderFields::new(
315 opcode,
316 extras_len,
317 key_len,
318 STATUS_SUCCESS,
319 body_len,
320 opaque,
321 0,
322 ),
323 );
324 out.extend_from_slice(&value.to_be_bytes());
325 (STATUS_SUCCESS, 24 + 8)
326 }
327 Response::Value(entry) => {
328 let extras_len = 4u8;
329 let key = if return_key { entry.key.as_ref() } else { b"" };
330 let key_len = key.len() as u16;
331 let body_len = extras_len as u32 + key_len as u32 + entry.value.len() as u32;
332 encode_header(
333 out,
334 HeaderFields::new(
335 opcode,
336 extras_len,
337 key_len,
338 STATUS_SUCCESS,
339 body_len,
340 opaque,
341 entry.cas.unwrap_or(0),
342 ),
343 );
344 out.extend_from_slice(&entry.flags.to_be_bytes());
345 if return_key {
346 out.extend_from_slice(key);
347 }
348 out.extend_from_slice(entry.value.as_ref());
349 (STATUS_SUCCESS, 24 + body_len as usize)
350 }
351 Response::Values(entries) => {
352 if let Some(entry) = entries.first() {
353 encode_binary_response(meta, &Response::Value(entry.clone()), out, return_key)
354 } else {
355 encode_header(
356 out,
357 HeaderFields::new(opcode, 0, 0, STATUS_KEY_NOT_FOUND, 0, opaque, 0),
358 );
359 (STATUS_KEY_NOT_FOUND, 24)
360 }
361 }
362 Response::Stats(lines) => {
363 let mut total = 0usize;
364 for line in lines {
365 total += encode_stat_line(meta, line, out);
366 }
367 total += encode_header(
368 out,
369 HeaderFields::new(opcode, 0, 0, STATUS_SUCCESS, 0, opaque, 0),
370 );
371 (STATUS_SUCCESS, total)
372 }
373 Response::Version(version) => {
374 let body_len = version.len() as u32;
375 encode_header(
376 out,
377 HeaderFields::new(opcode, 0, 0, STATUS_SUCCESS, body_len, opaque, 0),
378 );
379 out.extend_from_slice(version.as_ref());
380 (STATUS_SUCCESS, 24 + version.len())
381 }
382 Response::Error(err) => {
383 let status = match err.kind {
384 crate::error::ErrorKind::UnknownCommand => STATUS_UNKNOWN_COMMAND,
385 crate::error::ErrorKind::Client => STATUS_INVALID_ARGUMENTS,
386 crate::error::ErrorKind::Server => STATUS_INTERNAL_ERROR,
387 };
388 let body_len = err.message.len() as u32;
389 encode_header(
390 out,
391 HeaderFields::new(opcode, 0, 0, status, body_len, opaque, 0),
392 );
393 out.extend_from_slice(err.message.as_ref());
394 (status, 24 + err.message.len())
395 }
396 Response::Meta(_) | Response::ValuesStream(_) | Response::StatsStream(_) => {
397 encode_header(
398 out,
399 HeaderFields::new(opcode, 0, 0, STATUS_INTERNAL_ERROR, 0, opaque, 0),
400 );
401 (STATUS_INTERNAL_ERROR, 24)
402 }
403 }
404}
405
406struct HeaderFields {
407 opcode: u8,
408 extras_len: u8,
409 key_len: u16,
410 status: u16,
411 body_len: u32,
412 opaque: u32,
413 cas: u64,
414}
415
416impl HeaderFields {
417 fn new(
418 opcode: u8,
419 extras_len: u8,
420 key_len: u16,
421 status: u16,
422 body_len: u32,
423 opaque: u32,
424 cas: u64,
425 ) -> Self {
426 Self {
427 opcode,
428 extras_len,
429 key_len,
430 status,
431 body_len,
432 opaque,
433 cas,
434 }
435 }
436}
437
438fn encode_header(out: &mut BytesMut, header: HeaderFields) -> usize {
439 let HeaderFields {
440 opcode,
441 extras_len,
442 key_len,
443 status,
444 body_len,
445 opaque,
446 cas,
447 } = header;
448 out.extend_from_slice(&[
449 0x81,
450 opcode,
451 (key_len >> 8) as u8,
452 (key_len & 0xff) as u8,
453 extras_len,
454 0x00,
455 (status >> 8) as u8,
456 (status & 0xff) as u8,
457 ((body_len >> 24) & 0xff) as u8,
458 ((body_len >> 16) & 0xff) as u8,
459 ((body_len >> 8) & 0xff) as u8,
460 (body_len & 0xff) as u8,
461 ((opaque >> 24) & 0xff) as u8,
462 ((opaque >> 16) & 0xff) as u8,
463 ((opaque >> 8) & 0xff) as u8,
464 (opaque & 0xff) as u8,
465 ((cas >> 56) & 0xff) as u8,
466 ((cas >> 48) & 0xff) as u8,
467 ((cas >> 40) & 0xff) as u8,
468 ((cas >> 32) & 0xff) as u8,
469 ((cas >> 24) & 0xff) as u8,
470 ((cas >> 16) & 0xff) as u8,
471 ((cas >> 8) & 0xff) as u8,
472 (cas & 0xff) as u8,
473 ]);
474 24
475}
476
477fn encode_stat_line(meta: RequestMeta, line: &StatLine, out: &mut BytesMut) -> usize {
478 let opcode = meta.opcode;
479 let opaque = meta.opaque.unwrap_or(0);
480 let key_len = line.key.len() as u16;
481 let body_len = key_len as u32 + line.value.len() as u32;
482 encode_header(
483 out,
484 HeaderFields::new(opcode, 0, key_len, STATUS_SUCCESS, body_len, opaque, 0),
485 );
486 out.extend_from_slice(line.key.as_ref());
487 out.extend_from_slice(line.value.as_ref());
488 24 + body_len as usize
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494 use bytes::BytesMut;
495
496 #[test]
497 fn decode_get_request() {
498 let key = b"foo";
499 let mut buf = BytesMut::with_capacity(24 + key.len());
500 buf.extend_from_slice(&[
501 0x80,
502 0x00, 0x00,
504 key.len() as u8, 0x00, 0x00, 0x00,
508 0x00, 0x00,
510 0x00,
511 0x00,
512 key.len() as u8, 0xde,
514 0xad,
515 0xbe,
516 0xef, 0x00,
518 0x00,
519 0x00,
520 0x00,
521 0x00,
522 0x00,
523 0x00,
524 0x00, ]);
526 buf.extend_from_slice(key);
527 let mut decoder = BinaryDecoder::new();
528 let outcome = decoder.decode(&mut buf, BinaryLimits::default());
529 let (req, meta) = match outcome {
530 Some(DecodeOutcome::Request(req, meta)) => (req, meta),
531 _ => panic!("unexpected outcome"),
532 };
533 assert_eq!(req.op, Op::Get);
534 assert_eq!(req.key.unwrap(), Bytes::from_static(b"foo"));
535 assert_eq!(meta.protocol, Protocol::Binary);
536 }
537
538 #[test]
539 fn decode_invalid_magic() {
540 let mut buf = BytesMut::with_capacity(24);
541 buf.extend_from_slice(&[
542 0x81, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ]);
551 let mut decoder = BinaryDecoder::new();
552 let outcome = decoder.decode(&mut buf, BinaryLimits::default());
553 match outcome {
554 Some(DecodeOutcome::Response(_, Response::Error(_))) => {}
555 _ => panic!("unexpected outcome"),
556 }
557 }
558}