1use bytes::{Buf, BufMut, Bytes, BytesMut};
2
3use crate::codec::DecodeOutcome;
4use crate::error::{Error, ErrorKind};
5use crate::response::Response;
6use crate::types::{
7 MetaCode, MetaFlags, MetaResponse, Op, Protocol, ReplyMode, Request, RequestMeta, StatLine,
8 ValueEntry,
9};
10
11#[derive(Debug, Clone, Copy)]
13pub struct AsciiLimits {
14 pub max_line_len: usize,
15 pub max_blob_len: usize,
16}
17
18impl Default for AsciiLimits {
19 fn default() -> Self {
20 Self {
21 max_line_len: 4096,
22 max_blob_len: 1 << 20,
23 }
24 }
25}
26
27#[derive(Debug, Clone)]
28struct BlobState {
29 len: usize,
30 req: Request,
31 meta: RequestMeta,
32}
33
34#[derive(Debug, Clone)]
35enum AsciiState {
36 Line,
37 Blob(Box<BlobState>),
38}
39
40#[derive(Debug, Clone)]
42pub struct AsciiDecoder {
43 state: AsciiState,
44 discarding_line: bool,
45}
46
47impl Default for AsciiDecoder {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl AsciiDecoder {
54 pub fn new() -> Self {
55 Self {
56 state: AsciiState::Line,
57 discarding_line: false,
58 }
59 }
60
61 pub fn decode(&mut self, buf: &mut BytesMut, limits: AsciiLimits) -> Option<DecodeOutcome> {
62 let state = std::mem::replace(&mut self.state, AsciiState::Line);
63 match state {
64 AsciiState::Line => {
65 self.state = AsciiState::Line;
66 self.decode_line(buf, limits)
67 }
68 AsciiState::Blob(blob) => {
69 let BlobState { len, mut req, meta } = *blob;
70 if buf.len() < len + 2 {
71 self.state = AsciiState::Blob(Box::new(BlobState { len, req, meta }));
72 return None;
73 }
74 let payload = buf.split_to(len).freeze();
75 let cr = buf.get_u8();
76 let lf = buf.get_u8();
77 if cr != b'\r' || lf != b'\n' {
78 let response = Response::Error(Error::client("bad data chunk"));
79 self.state = AsciiState::Line;
80 return Some(DecodeOutcome::Response(meta, response));
81 }
82 req.value = Some(payload);
83 self.state = AsciiState::Line;
84 Some(DecodeOutcome::Request(req, meta))
85 }
86 }
87 }
88
89 fn decode_line(&mut self, buf: &mut BytesMut, limits: AsciiLimits) -> Option<DecodeOutcome> {
90 if self.discarding_line {
91 if let Some(pos) = find_crlf(buf) {
92 buf.advance(pos + 2);
93 self.discarding_line = false;
94 let meta = RequestMeta::ascii();
95 let response = Response::Error(Error::client("line too long"));
96 return Some(DecodeOutcome::Response(meta, response));
97 }
98 if buf.len() > limits.max_line_len {
99 buf.clear();
100 }
101 return None;
102 }
103
104 let pos = match find_crlf(buf) {
105 Some(pos) => pos,
106 None => {
107 if buf.len() > limits.max_line_len {
108 self.discarding_line = true;
109 }
110 return None;
111 }
112 };
113
114 if pos > limits.max_line_len {
115 buf.advance(pos + 2);
116 let meta = RequestMeta::ascii();
117 let response = Response::Error(Error::client("line too long"));
118 return Some(DecodeOutcome::Response(meta, response));
119 }
120
121 let line = buf.split_to(pos).freeze();
122 buf.advance(2);
123
124 if line.is_empty() {
125 let meta = RequestMeta::ascii();
126 let response = Response::Error(Error::client("empty command"));
127 return Some(DecodeOutcome::Response(meta, response));
128 }
129
130 match parse_line(line, limits.max_blob_len) {
131 Ok(LineParse::Line(req, meta)) => Some(DecodeOutcome::Request(req, meta)),
132 Ok(LineParse::Blob { len, mut req, meta }) => {
133 if buf.len() >= len + 2 {
134 let payload = buf.split_to(len).freeze();
135 let cr = buf.get_u8();
136 let lf = buf.get_u8();
137 if cr != b'\r' || lf != b'\n' {
138 let response = Response::Error(Error::client("bad data chunk"));
139 return Some(DecodeOutcome::Response(meta, response));
140 }
141 req.value = Some(payload);
142 return Some(DecodeOutcome::Request(req, meta));
143 }
144 self.state = AsciiState::Blob(Box::new(BlobState { len, req, meta }));
145 None
146 }
147 Err(err) => Some(DecodeOutcome::Response(
148 RequestMeta::ascii(),
149 Response::Error(err),
150 )),
151 }
152 }
153}
154
155enum LineParse {
156 Line(Request, RequestMeta),
157 Blob {
158 len: usize,
159 req: Request,
160 meta: RequestMeta,
161 },
162}
163
164fn parse_line(line: Bytes, max_blob_len: usize) -> Result<LineParse, Error> {
165 let tokens = split_tokens(&line);
166 if tokens.is_empty() {
167 return Err(Error::client("empty command"));
168 }
169
170 let cmd = tokens[0].as_ref();
171 let op = parse_op(cmd);
172 let mut meta = RequestMeta::ascii();
173 let mut req = Request::new(op);
174
175 match op {
176 Op::Get | Op::Gets => {
177 if tokens.len() < 2 {
178 return Err(Error::client("missing key"));
179 }
180 let keys = parse_keys(&tokens[1..])?;
181 if keys.len() == 1 {
182 req.key = Some(keys[0].clone());
183 } else {
184 req.keys = keys;
185 }
186 }
187 Op::Gat | Op::Gats => {
188 if tokens.len() < 3 {
189 return Err(Error::client("missing exptime or key"));
190 }
191 let exptime =
192 parse_i64(tokens[1].as_ref()).ok_or_else(|| Error::client("invalid exptime"))?;
193 req.exptime = Some(exptime);
194 let keys = parse_keys(&tokens[2..])?;
195 if keys.len() == 1 {
196 req.key = Some(keys[0].clone());
197 } else {
198 req.keys = keys;
199 }
200 }
201 Op::Set | Op::Add | Op::Replace | Op::Append | Op::Prepend | Op::Cas => {
202 let min = if matches!(op, Op::Cas) { 6 } else { 5 };
203 if tokens.len() < min {
204 return Err(Error::client("missing arguments"));
205 }
206 let mut end = tokens.len();
207 let mut reply = ReplyMode::Always;
208 if tokens.len() > min && tokens.last().map(|t| t.as_ref()) == Some(b"noreply") {
209 reply = ReplyMode::SuppressSuccess;
210 end -= 1;
211 }
212 if end != min {
213 return Err(Error::client("invalid arguments"));
214 }
215 let key = tokens[1].clone();
216 validate_key(&key)?;
217 req.key = Some(key);
218 req.flags =
219 Some(parse_u32(tokens[2].as_ref()).ok_or_else(|| Error::client("invalid flags"))?);
220 req.exptime = Some(
221 parse_i64(tokens[3].as_ref()).ok_or_else(|| Error::client("invalid exptime"))?,
222 );
223 let bytes =
224 parse_usize(tokens[4].as_ref()).ok_or_else(|| Error::client("invalid bytes"))?;
225 if bytes > max_blob_len {
226 return Err(Error::server("value too large"));
227 }
228 if matches!(op, Op::Cas) {
229 req.cas = Some(
230 parse_u64(tokens[5].as_ref()).ok_or_else(|| Error::client("invalid cas"))?,
231 );
232 }
233 meta.reply = reply;
234 return Ok(LineParse::Blob {
235 len: bytes,
236 req,
237 meta,
238 });
239 }
240 Op::Delete => {
241 if tokens.len() < 2 {
242 return Err(Error::client("missing key"));
243 }
244 let mut reply = ReplyMode::Always;
245 if tokens.len() > 2 && tokens.last().map(|t| t.as_ref()) == Some(b"noreply") {
246 reply = ReplyMode::SuppressSuccess;
247 if tokens.len() != 3 {
248 return Err(Error::client("invalid arguments"));
249 }
250 } else if tokens.len() != 2 {
251 return Err(Error::client("invalid arguments"));
252 }
253 let key = tokens[1].clone();
254 validate_key(&key)?;
255 req.key = Some(key);
256 meta.reply = reply;
257 }
258 Op::Flush => {
259 let mut reply = ReplyMode::Always;
260 let mut delay: Option<i64> = None;
261 match tokens.len() {
262 1 => {}
263 2 => {
264 if tokens[1].as_ref() == b"noreply" {
265 reply = ReplyMode::SuppressSuccess;
266 } else {
267 delay = Some(
268 parse_i64(tokens[1].as_ref())
269 .ok_or_else(|| Error::client("invalid exptime"))?,
270 );
271 }
272 }
273 3 => {
274 if tokens[2].as_ref() != b"noreply" {
275 return Err(Error::client("invalid arguments"));
276 }
277 reply = ReplyMode::SuppressSuccess;
278 delay = Some(
279 parse_i64(tokens[1].as_ref())
280 .ok_or_else(|| Error::client("invalid exptime"))?,
281 );
282 }
283 _ => return Err(Error::client("invalid arguments")),
284 }
285 if let Some(value) = delay {
286 if value < 0 {
287 return Err(Error::client("invalid exptime"));
288 }
289 req.exptime = Some(value);
290 }
291 meta.reply = reply;
292 }
293 Op::Incr | Op::Decr => {
294 if tokens.len() < 3 {
295 return Err(Error::client("missing arguments"));
296 }
297 let mut reply = ReplyMode::Always;
298 if tokens.len() > 3 && tokens.last().map(|t| t.as_ref()) == Some(b"noreply") {
299 reply = ReplyMode::SuppressSuccess;
300 if tokens.len() != 4 {
301 return Err(Error::client("invalid arguments"));
302 }
303 } else if tokens.len() != 3 {
304 return Err(Error::client("invalid arguments"));
305 }
306 let key = tokens[1].clone();
307 validate_key(&key)?;
308 req.key = Some(key);
309 req.delta =
310 Some(parse_u64(tokens[2].as_ref()).ok_or_else(|| Error::client("invalid delta"))?);
311 meta.reply = reply;
312 }
313 Op::Touch => {
314 if tokens.len() < 3 {
315 return Err(Error::client("missing arguments"));
316 }
317 let mut reply = ReplyMode::Always;
318 if tokens.len() > 3 && tokens.last().map(|t| t.as_ref()) == Some(b"noreply") {
319 reply = ReplyMode::SuppressSuccess;
320 if tokens.len() != 4 {
321 return Err(Error::client("invalid arguments"));
322 }
323 } else if tokens.len() != 3 {
324 return Err(Error::client("invalid arguments"));
325 }
326 let key = tokens[1].clone();
327 validate_key(&key)?;
328 req.key = Some(key);
329 req.exptime = Some(
330 parse_i64(tokens[2].as_ref()).ok_or_else(|| Error::client("invalid exptime"))?,
331 );
332 meta.reply = reply;
333 }
334 Op::Stats => {
335 if tokens.len() > 1 {
336 let keys = parse_keys(&tokens[1..])?;
337 req.keys = keys;
338 }
339 }
340 Op::Version | Op::Quit => {
341 if tokens.len() != 1 {
342 return Err(Error::client("invalid arguments"));
343 }
344 }
345 Op::MetaGet
346 | Op::MetaSet
347 | Op::MetaDelete
348 | Op::MetaArithmetic
349 | Op::MetaDebug
350 | Op::MetaNoop => {
351 meta.protocol = Protocol::Meta;
352 if op == Op::MetaNoop {
353 if tokens.len() != 1 {
354 return Err(Error::client("invalid arguments"));
355 }
356 return Ok(LineParse::Line(req, meta));
357 }
358 if tokens.len() < 2 {
359 return Err(Error::client("missing key"));
360 }
361 let mut idx = 2;
362 let mut datalen = None;
363 if op == Op::MetaSet {
364 if tokens.len() < 3 {
365 return Err(Error::client("missing data length"));
366 }
367 datalen = Some(
368 parse_usize(tokens[2].as_ref())
369 .ok_or_else(|| Error::client("invalid data length"))?,
370 );
371 idx = 3;
372 }
373 let meta_flags = parse_meta_flags(&tokens[idx..]);
374 if meta_flags.has(b'q') {
375 meta.reply = if op == Op::MetaGet {
376 ReplyMode::SuppressMiss
377 } else {
378 ReplyMode::SuppressSuccess
379 };
380 }
381 let mut key = tokens[1].clone();
382 if meta_flags.has(b'b') {
383 let decoded = decode_base64(key.as_ref())?;
384 if decoded.len() > 250 {
385 return Err(Error::client("key too long"));
386 }
387 key = Bytes::from(decoded);
388 } else {
389 validate_key(&key)?;
390 }
391 req.key = Some(key);
392 if let Some(delta) = meta_flags
393 .token(b'D')
394 .and_then(|token| parse_u64(token.as_ref()))
395 {
396 req.delta = Some(delta);
397 }
398 if let Some(initial) = meta_flags
399 .token(b'J')
400 .and_then(|token| parse_u64(token.as_ref()))
401 {
402 req.initial = Some(initial);
403 }
404 if let Some(cas) = meta_flags
405 .token(b'C')
406 .and_then(|token| parse_u64(token.as_ref()))
407 {
408 req.cas = Some(cas);
409 }
410 req.meta = Some(meta_flags.clone());
411 if op == Op::MetaSet {
412 let mut length = datalen.unwrap_or(0);
413 if let Some(slen) = meta_flags
414 .token(b'S')
415 .and_then(|token| parse_usize(token.as_ref()))
416 {
417 length = slen;
418 }
419 if length > max_blob_len {
420 return Err(Error::server("value too large"));
421 }
422 if length == 0 {
423 return Ok(LineParse::Line(req, meta));
424 }
425 return Ok(LineParse::Blob {
426 len: length,
427 req,
428 meta,
429 });
430 }
431 }
432 Op::SaslListMechs | Op::SaslAuth | Op::SaslStep | Op::Unknown => {
433 req.op = Op::Unknown;
434 }
435 Op::Noop => {}
436 }
437
438 Ok(LineParse::Line(req, meta))
439}
440
441fn parse_op(cmd: &[u8]) -> Op {
442 match cmd {
443 b"get" => Op::Get,
444 b"gets" => Op::Gets,
445 b"gat" => Op::Gat,
446 b"gats" => Op::Gats,
447 b"set" => Op::Set,
448 b"add" => Op::Add,
449 b"replace" => Op::Replace,
450 b"append" => Op::Append,
451 b"prepend" => Op::Prepend,
452 b"cas" => Op::Cas,
453 b"delete" => Op::Delete,
454 b"flush_all" => Op::Flush,
455 b"incr" => Op::Incr,
456 b"decr" => Op::Decr,
457 b"touch" => Op::Touch,
458 b"stats" => Op::Stats,
459 b"version" => Op::Version,
460 b"quit" => Op::Quit,
461 b"mg" => Op::MetaGet,
462 b"ms" => Op::MetaSet,
463 b"md" => Op::MetaDelete,
464 b"ma" => Op::MetaArithmetic,
465 b"me" => Op::MetaDebug,
466 b"mn" => Op::MetaNoop,
467 _ => Op::Unknown,
468 }
469}
470
471fn parse_keys(tokens: &[Bytes]) -> Result<Vec<Bytes>, Error> {
472 let mut keys = Vec::with_capacity(tokens.len());
473 for token in tokens {
474 validate_key(token)?;
475 keys.push(token.clone());
476 }
477 Ok(keys)
478}
479
480fn validate_key(key: &Bytes) -> Result<(), Error> {
481 if key.is_empty() {
482 return Err(Error::client("empty key"));
483 }
484 if key.len() > 250 {
485 return Err(Error::client("key too long"));
486 }
487 for &b in key.as_ref() {
488 if b <= b' ' || b == 0x7f {
489 return Err(Error::client("invalid key"));
490 }
491 }
492 Ok(())
493}
494
495fn split_tokens(line: &Bytes) -> Vec<Bytes> {
496 let mut tokens = Vec::new();
497 let mut start = 0;
498 let bytes = line.as_ref();
499 while start < bytes.len() {
500 while start < bytes.len() && bytes[start] == b' ' {
501 start += 1;
502 }
503 if start >= bytes.len() {
504 break;
505 }
506 let mut end = start;
507 while end < bytes.len() && bytes[end] != b' ' {
508 end += 1;
509 }
510 tokens.push(line.slice(start..end));
511 start = end;
512 }
513 tokens
514}
515
516fn find_crlf(buf: &BytesMut) -> Option<usize> {
517 let bytes = buf.as_ref();
518 if bytes.len() < 2 {
519 return None;
520 }
521 let mut i = 0;
522 while i + 1 < bytes.len() {
523 if bytes[i] == b'\r' && bytes[i + 1] == b'\n' {
524 return Some(i);
525 }
526 i += 1;
527 }
528 None
529}
530
531fn parse_u32(token: &[u8]) -> Option<u32> {
532 parse_u64(token).and_then(|value| u32::try_from(value).ok())
533}
534
535fn parse_usize(token: &[u8]) -> Option<usize> {
536 parse_u64(token).and_then(|value| usize::try_from(value).ok())
537}
538
539fn parse_u64(token: &[u8]) -> Option<u64> {
540 if token.is_empty() {
541 return None;
542 }
543 let mut value: u64 = 0;
544 for &b in token {
545 if !b.is_ascii_digit() {
546 return None;
547 }
548 value = value.checked_mul(10)?;
549 value = value.checked_add((b - b'0') as u64)?;
550 }
551 Some(value)
552}
553
554fn parse_i64(token: &[u8]) -> Option<i64> {
555 if token.is_empty() {
556 return None;
557 }
558 let (neg, rest) = if token[0] == b'-' {
559 (true, &token[1..])
560 } else {
561 (false, token)
562 };
563 let value = parse_u64(rest)? as i64;
564 if neg { Some(-value) } else { Some(value) }
565}
566
567fn parse_meta_flags(tokens: &[Bytes]) -> MetaFlags {
568 let mut ordered = Vec::with_capacity(tokens.len());
569 for token in tokens {
570 if token.is_empty() {
571 continue;
572 }
573 let code = token.as_ref()[0];
574 let rest = if token.len() > 1 {
575 Some(token.slice(1..))
576 } else {
577 None
578 };
579 ordered.push(crate::types::MetaFlag { code, token: rest });
580 }
581 MetaFlags::new(ordered)
582}
583
584fn decode_base64(input: &[u8]) -> Result<Vec<u8>, Error> {
585 if !input.len().is_multiple_of(4) {
586 return Err(Error::client("invalid base64"));
587 }
588 let mut out = Vec::with_capacity(input.len() / 4 * 3);
589 let mut i = 0;
590 while i < input.len() {
591 let a = decode_base64_val(input[i])?;
592 let b = decode_base64_val(input[i + 1])?;
593 let c = input[i + 2];
594 let d = input[i + 3];
595 let c_val = if c == b'=' {
596 None
597 } else {
598 Some(decode_base64_val(c)?)
599 };
600 let d_val = if d == b'=' {
601 None
602 } else {
603 Some(decode_base64_val(d)?)
604 };
605
606 out.push((a << 2) | (b >> 4));
607 if let Some(c_val) = c_val {
608 out.push((b << 4) | (c_val >> 2));
609 if let Some(d_val) = d_val {
610 out.push((c_val << 6) | d_val);
611 }
612 }
613 i += 4;
614 }
615 Ok(out)
616}
617
618fn decode_base64_val(byte: u8) -> Result<u8, Error> {
619 match byte {
620 b'A'..=b'Z' => Ok(byte - b'A'),
621 b'a'..=b'z' => Ok(byte - b'a' + 26),
622 b'0'..=b'9' => Ok(byte - b'0' + 52),
623 b'+' => Ok(62),
624 b'/' => Ok(63),
625 b'=' => Ok(0),
626 _ => Err(Error::client("invalid base64")),
627 }
628}
629
630pub fn should_suppress_ascii(meta: RequestMeta, response: &Response) -> bool {
631 match meta.reply {
632 ReplyMode::Always => false,
633 ReplyMode::SuppressMiss => {
634 if let Response::Meta(meta_resp) = response {
635 meta_resp.code.is_miss()
636 } else {
637 false
638 }
639 }
640 ReplyMode::SuppressSuccess => !matches!(response, Response::Error(_)),
641 ReplyMode::QuietBuffered => false,
642 }
643}
644
645pub fn should_suppress_meta(meta: RequestMeta, response: &Response) -> bool {
646 match meta.reply {
647 ReplyMode::Always => false,
648 ReplyMode::SuppressMiss => {
649 if let Response::Meta(meta_resp) = response {
650 meta_resp.code.is_miss()
651 } else {
652 false
653 }
654 }
655 ReplyMode::SuppressSuccess => {
656 if let Response::Meta(meta_resp) = response {
657 meta_resp.code.is_success()
658 } else {
659 false
660 }
661 }
662 ReplyMode::QuietBuffered => false,
663 }
664}
665
666pub fn encode_ascii_response(
667 req: &Request,
668 meta: RequestMeta,
669 response: &Response,
670 out: &mut BytesMut,
671) -> bool {
672 if should_suppress_ascii(meta, response) {
673 return false;
674 }
675 match response {
676 Response::Stored => out.extend_from_slice(b"STORED\r\n"),
677 Response::NotStored => out.extend_from_slice(b"NOT_STORED\r\n"),
678 Response::Exists => out.extend_from_slice(b"EXISTS\r\n"),
679 Response::NotFound => out.extend_from_slice(b"NOT_FOUND\r\n"),
680 Response::Deleted => out.extend_from_slice(b"DELETED\r\n"),
681 Response::Touched => out.extend_from_slice(b"TOUCHED\r\n"),
682 Response::Ok => out.extend_from_slice(b"OK\r\n"),
683 Response::Numeric(value) => {
684 write_u64(out, *value);
685 out.extend_from_slice(b"\r\n");
686 }
687 Response::Value(entry) => {
688 let include_cas = matches!(req.op, Op::Gets | Op::Gats);
689 encode_value_entry(entry, include_cas, out);
690 out.extend_from_slice(b"END\r\n");
691 }
692 Response::Values(entries) => {
693 let include_cas = matches!(req.op, Op::Gets | Op::Gats);
694 for entry in entries {
695 encode_value_entry(entry, include_cas, out);
696 }
697 out.extend_from_slice(b"END\r\n");
698 }
699 Response::Stats(lines) => {
700 for line in lines {
701 encode_stat_line(line, out);
702 }
703 out.extend_from_slice(b"END\r\n");
704 }
705 Response::Version(version) => {
706 out.extend_from_slice(b"VERSION ");
707 out.extend_from_slice(version);
708 out.extend_from_slice(b"\r\n");
709 }
710 Response::Noop => {
711 if req.op == Op::MetaNoop {
712 out.extend_from_slice(b"MN\r\n");
713 }
714 }
715 Response::Error(err) => match err.kind {
716 ErrorKind::UnknownCommand => out.extend_from_slice(b"ERROR\r\n"),
717 ErrorKind::Client | ErrorKind::Auth => {
718 out.extend_from_slice(b"CLIENT_ERROR ");
719 out.extend_from_slice(err.message.as_ref());
720 out.extend_from_slice(b"\r\n");
721 }
722 ErrorKind::Server => {
723 out.extend_from_slice(b"SERVER_ERROR ");
724 out.extend_from_slice(err.message.as_ref());
725 out.extend_from_slice(b"\r\n");
726 }
727 },
728 Response::Meta(_) | Response::ValuesStream(_) | Response::StatsStream(_) => {}
729 }
730 true
731}
732
733pub fn encode_meta_response(
734 req: &Request,
735 meta: RequestMeta,
736 response: &MetaResponse,
737 out: &mut BytesMut,
738) {
739 if should_suppress_meta(meta, &Response::Meta(response.clone())) {
740 return;
741 }
742 out.extend_from_slice(response.code.as_bytes());
743 if response.code == MetaCode::Va {
744 out.extend_from_slice(b" ");
745 let size = response
746 .size
747 .or_else(|| response.value.as_ref().map(|value| value.len()))
748 .unwrap_or(0);
749 write_usize(out, size);
750 }
751
752 if let Some(flags) = req.meta.as_ref() {
753 for flag in &flags.ordered {
754 append_meta_token(req, response, flag.code, out);
755 }
756 }
757
758 if let Some(win) = response.extra.won {
759 out.extend_from_slice(match win {
760 crate::types::WinState::Won => b" W",
761 crate::types::WinState::AlreadyWon => b" Z",
762 });
763 }
764 if response.extra.stale {
765 out.extend_from_slice(b" X");
766 }
767
768 out.extend_from_slice(b"\r\n");
769
770 let want_value = req
771 .meta
772 .as_ref()
773 .map(|flags| flags.has(b'v'))
774 .unwrap_or(false);
775 if want_value && let Some(value) = response.value.as_ref() {
776 out.extend_from_slice(value);
777 out.extend_from_slice(b"\r\n");
778 }
779}
780
781pub fn encode_meta_debug(
782 req: &Request,
783 lines: impl IntoIterator<Item = StatLine>,
784 out: &mut BytesMut,
785) {
786 let key = match req.key.as_ref() {
787 Some(key) => key.as_ref(),
788 None => b"",
789 };
790 out.extend_from_slice(b"ME ");
791 out.extend_from_slice(key);
792 for line in lines {
793 out.extend_from_slice(b" ");
794 out.extend_from_slice(line.key.as_ref());
795 out.extend_from_slice(b"=");
796 out.extend_from_slice(line.value.as_ref());
797 }
798 out.extend_from_slice(b"\r\n");
799}
800
801pub fn encode_value_entry(entry: &ValueEntry, include_cas: bool, out: &mut BytesMut) {
802 out.extend_from_slice(b"VALUE ");
803 out.extend_from_slice(entry.key.as_ref());
804 out.extend_from_slice(b" ");
805 write_u32(out, entry.flags);
806 out.extend_from_slice(b" ");
807 write_usize(out, entry.value.len());
808 if include_cas {
809 out.extend_from_slice(b" ");
810 write_u64(out, entry.cas.unwrap_or(0));
811 }
812 out.extend_from_slice(b"\r\n");
813 out.extend_from_slice(entry.value.as_ref());
814 out.extend_from_slice(b"\r\n");
815}
816
817pub fn encode_stat_line(line: &StatLine, out: &mut BytesMut) {
818 out.extend_from_slice(b"STAT ");
819 out.extend_from_slice(line.key.as_ref());
820 out.extend_from_slice(b" ");
821 out.extend_from_slice(line.value.as_ref());
822 out.extend_from_slice(b"\r\n");
823}
824
825fn append_meta_token(req: &Request, response: &MetaResponse, code: u8, out: &mut BytesMut) {
826 match code {
827 b'O' => {
828 if let Some(token) = req.meta.as_ref().and_then(|flags| flags.token(b'O')) {
829 out.extend_from_slice(b" O");
830 out.extend_from_slice(token.as_ref());
831 }
832 }
833 b'k' => {
834 if let Some(key) = req.key.as_ref() {
835 out.extend_from_slice(b" k");
836 out.extend_from_slice(key.as_ref());
837 }
838 }
839 b'c' => {
840 if let Some(cas) = response.cas {
841 out.extend_from_slice(b" c");
842 write_u64(out, cas);
843 }
844 }
845 b't' => {
846 if let Some(ttl) = response.ttl {
847 out.extend_from_slice(b" t");
848 write_i64(out, ttl);
849 }
850 }
851 b'f' => {
852 if let Some(flags) = response.flags {
853 out.extend_from_slice(b" f");
854 write_u32(out, flags);
855 }
856 }
857 b's' => {
858 if let Some(size) = response.size {
859 out.extend_from_slice(b" s");
860 write_usize(out, size);
861 }
862 }
863 b'h' => {
864 if let Some(hit) = response.hit {
865 out.extend_from_slice(b" h");
866 out.extend_from_slice(if hit { b"1" } else { b"0" });
867 }
868 }
869 b'l' => {
870 if let Some(last) = response.last_access {
871 out.extend_from_slice(b" l");
872 write_u64(out, last);
873 }
874 }
875 _ => {}
876 }
877}
878
879fn write_u32(out: &mut BytesMut, value: u32) {
880 write_u64(out, value as u64)
881}
882
883fn write_usize(out: &mut BytesMut, value: usize) {
884 write_u64(out, value as u64)
885}
886
887fn write_u64(out: &mut BytesMut, mut value: u64) {
888 let mut buf = [0u8; 20];
889 let mut i = buf.len();
890 if value == 0 {
891 out.put_u8(b'0');
892 return;
893 }
894 while value > 0 {
895 i -= 1;
896 buf[i] = b'0' + (value % 10) as u8;
897 value /= 10;
898 }
899 out.extend_from_slice(&buf[i..]);
900}
901
902fn write_i64(out: &mut BytesMut, value: i64) {
903 if value < 0 {
904 out.put_u8(b'-');
905 write_u64(out, (-value) as u64);
906 } else {
907 write_u64(out, value as u64);
908 }
909}
910
911#[cfg(test)]
912mod tests {
913 use super::*;
914 use bytes::BytesMut;
915
916 #[test]
917 fn decode_set_with_value() {
918 let mut decoder = AsciiDecoder::new();
919 let mut buf = BytesMut::from("set key 1 10 5\r\nhello\r\n");
920 let limits = AsciiLimits::default();
921
922 let outcome = decoder.decode(&mut buf, limits);
923 let (req, meta) = match outcome {
924 Some(DecodeOutcome::Request(req, meta)) => (req, meta),
925 _ => panic!("unexpected decode outcome"),
926 };
927 assert_eq!(req.op, Op::Set);
928 assert_eq!(req.value.unwrap(), Bytes::from_static(b"hello"));
929 assert_eq!(meta.reply, ReplyMode::Always);
930 }
931
932 #[test]
933 fn decode_multi_get() {
934 let mut decoder = AsciiDecoder::new();
935 let mut buf = BytesMut::from("get k1 k2\r\n");
936 let outcome = decoder.decode(&mut buf, AsciiLimits::default());
937 let (req, _) = match outcome {
938 Some(DecodeOutcome::Request(req, meta)) => (req, meta),
939 _ => panic!("unexpected decode outcome"),
940 };
941 assert_eq!(req.op, Op::Get);
942 assert_eq!(req.keys.len(), 2);
943 }
944
945 #[test]
946 fn decode_meta_set_s_token() {
947 let mut decoder = AsciiDecoder::new();
948 let mut buf = BytesMut::from("ms key 5 S3\r\nabc\r\n");
949 let limits = AsciiLimits::default();
950
951 let outcome = decoder.decode(&mut buf, limits);
952 let (req, meta) = match outcome {
953 Some(DecodeOutcome::Request(req, meta)) => (req, meta),
954 _ => panic!("unexpected decode outcome"),
955 };
956 assert_eq!(meta.protocol, Protocol::Meta);
957 assert_eq!(req.value.unwrap(), Bytes::from_static(b"abc"));
958 }
959
960 #[test]
961 fn decode_flush_all_delay_noreply() {
962 let mut decoder = AsciiDecoder::new();
963 let mut buf = BytesMut::from("flush_all 10 noreply\r\n");
964 let outcome = decoder.decode(&mut buf, AsciiLimits::default());
965 let (req, meta) = match outcome {
966 Some(DecodeOutcome::Request(req, meta)) => (req, meta),
967 _ => panic!("unexpected decode outcome"),
968 };
969 assert_eq!(req.op, Op::Flush);
970 assert_eq!(req.exptime, Some(10));
971 assert_eq!(meta.reply, ReplyMode::SuppressSuccess);
972 }
973}