Skip to main content

memcached_async/codec/
ascii.rs

1use bytes::{Buf, BufMut, Bytes, BytesMut};
2
3use crate::codec::DecodeOutcome;
4use crate::error::{Error, ErrorKind};
5use crate::response::Response;
6use crate::types::{
7    MetaCode, MetaFlags, MetaResponse, Op, Protocol, ReplyMode, Request, RequestMeta, StatLine,
8    ValueEntry,
9};
10
11/// ASCII/meta decode limits.
12#[derive(Debug, Clone, Copy)]
13pub struct AsciiLimits {
14    pub max_line_len: usize,
15    pub max_blob_len: usize,
16}
17
18impl Default for AsciiLimits {
19    fn default() -> Self {
20        Self {
21            max_line_len: 4096,
22            max_blob_len: 1 << 20,
23        }
24    }
25}
26
27#[derive(Debug, Clone)]
28struct BlobState {
29    len: usize,
30    req: Request,
31    meta: RequestMeta,
32}
33
34#[derive(Debug, Clone)]
35enum AsciiState {
36    Line,
37    Blob(Box<BlobState>),
38}
39
40/// Streaming ASCII/meta decoder.
41#[derive(Debug, Clone)]
42pub struct AsciiDecoder {
43    state: AsciiState,
44    discarding_line: bool,
45}
46
47impl Default for AsciiDecoder {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl AsciiDecoder {
54    pub fn new() -> Self {
55        Self {
56            state: AsciiState::Line,
57            discarding_line: false,
58        }
59    }
60
61    pub fn decode(&mut self, buf: &mut BytesMut, limits: AsciiLimits) -> Option<DecodeOutcome> {
62        let state = std::mem::replace(&mut self.state, AsciiState::Line);
63        match state {
64            AsciiState::Line => {
65                self.state = AsciiState::Line;
66                self.decode_line(buf, limits)
67            }
68            AsciiState::Blob(blob) => {
69                let BlobState { len, mut req, meta } = *blob;
70                if buf.len() < len + 2 {
71                    self.state = AsciiState::Blob(Box::new(BlobState { len, req, meta }));
72                    return None;
73                }
74                let payload = buf.split_to(len).freeze();
75                let cr = buf.get_u8();
76                let lf = buf.get_u8();
77                if cr != b'\r' || lf != b'\n' {
78                    let response = Response::Error(Error::client("bad data chunk"));
79                    self.state = AsciiState::Line;
80                    return Some(DecodeOutcome::Response(meta, response));
81                }
82                req.value = Some(payload);
83                self.state = AsciiState::Line;
84                Some(DecodeOutcome::Request(req, meta))
85            }
86        }
87    }
88
89    fn decode_line(&mut self, buf: &mut BytesMut, limits: AsciiLimits) -> Option<DecodeOutcome> {
90        if self.discarding_line {
91            if let Some(pos) = find_crlf(buf) {
92                buf.advance(pos + 2);
93                self.discarding_line = false;
94                let meta = RequestMeta::ascii();
95                let response = Response::Error(Error::client("line too long"));
96                return Some(DecodeOutcome::Response(meta, response));
97            }
98            if buf.len() > limits.max_line_len {
99                buf.clear();
100            }
101            return None;
102        }
103
104        let pos = match find_crlf(buf) {
105            Some(pos) => pos,
106            None => {
107                if buf.len() > limits.max_line_len {
108                    self.discarding_line = true;
109                }
110                return None;
111            }
112        };
113
114        if pos > limits.max_line_len {
115            buf.advance(pos + 2);
116            let meta = RequestMeta::ascii();
117            let response = Response::Error(Error::client("line too long"));
118            return Some(DecodeOutcome::Response(meta, response));
119        }
120
121        let line = buf.split_to(pos).freeze();
122        buf.advance(2);
123
124        if line.is_empty() {
125            let meta = RequestMeta::ascii();
126            let response = Response::Error(Error::client("empty command"));
127            return Some(DecodeOutcome::Response(meta, response));
128        }
129
130        match parse_line(line, limits.max_blob_len) {
131            Ok(LineParse::Line(req, meta)) => Some(DecodeOutcome::Request(req, meta)),
132            Ok(LineParse::Blob { len, mut req, meta }) => {
133                if buf.len() >= len + 2 {
134                    let payload = buf.split_to(len).freeze();
135                    let cr = buf.get_u8();
136                    let lf = buf.get_u8();
137                    if cr != b'\r' || lf != b'\n' {
138                        let response = Response::Error(Error::client("bad data chunk"));
139                        return Some(DecodeOutcome::Response(meta, response));
140                    }
141                    req.value = Some(payload);
142                    return Some(DecodeOutcome::Request(req, meta));
143                }
144                self.state = AsciiState::Blob(Box::new(BlobState { len, req, meta }));
145                None
146            }
147            Err(err) => Some(DecodeOutcome::Response(
148                RequestMeta::ascii(),
149                Response::Error(err),
150            )),
151        }
152    }
153}
154
155enum LineParse {
156    Line(Request, RequestMeta),
157    Blob {
158        len: usize,
159        req: Request,
160        meta: RequestMeta,
161    },
162}
163
164fn parse_line(line: Bytes, max_blob_len: usize) -> Result<LineParse, Error> {
165    let tokens = split_tokens(&line);
166    if tokens.is_empty() {
167        return Err(Error::client("empty command"));
168    }
169
170    let cmd = tokens[0].as_ref();
171    let op = parse_op(cmd);
172    let mut meta = RequestMeta::ascii();
173    let mut req = Request::new(op);
174
175    match op {
176        Op::Get | Op::Gets => {
177            if tokens.len() < 2 {
178                return Err(Error::client("missing key"));
179            }
180            let keys = parse_keys(&tokens[1..])?;
181            if keys.len() == 1 {
182                req.key = Some(keys[0].clone());
183            } else {
184                req.keys = keys;
185            }
186        }
187        Op::Gat | Op::Gats => {
188            if tokens.len() < 3 {
189                return Err(Error::client("missing exptime or key"));
190            }
191            let exptime =
192                parse_i64(tokens[1].as_ref()).ok_or_else(|| Error::client("invalid exptime"))?;
193            req.exptime = Some(exptime);
194            let keys = parse_keys(&tokens[2..])?;
195            if keys.len() == 1 {
196                req.key = Some(keys[0].clone());
197            } else {
198                req.keys = keys;
199            }
200        }
201        Op::Set | Op::Add | Op::Replace | Op::Append | Op::Prepend | Op::Cas => {
202            let min = if matches!(op, Op::Cas) { 6 } else { 5 };
203            if tokens.len() < min {
204                return Err(Error::client("missing arguments"));
205            }
206            let mut end = tokens.len();
207            let mut reply = ReplyMode::Always;
208            if tokens.len() > min && tokens.last().map(|t| t.as_ref()) == Some(b"noreply") {
209                reply = ReplyMode::SuppressSuccess;
210                end -= 1;
211            }
212            if end != min {
213                return Err(Error::client("invalid arguments"));
214            }
215            let key = tokens[1].clone();
216            validate_key(&key)?;
217            req.key = Some(key);
218            req.flags =
219                Some(parse_u32(tokens[2].as_ref()).ok_or_else(|| Error::client("invalid flags"))?);
220            req.exptime = Some(
221                parse_i64(tokens[3].as_ref()).ok_or_else(|| Error::client("invalid exptime"))?,
222            );
223            let bytes =
224                parse_usize(tokens[4].as_ref()).ok_or_else(|| Error::client("invalid bytes"))?;
225            if bytes > max_blob_len {
226                return Err(Error::server("value too large"));
227            }
228            if matches!(op, Op::Cas) {
229                req.cas = Some(
230                    parse_u64(tokens[5].as_ref()).ok_or_else(|| Error::client("invalid cas"))?,
231                );
232            }
233            meta.reply = reply;
234            return Ok(LineParse::Blob {
235                len: bytes,
236                req,
237                meta,
238            });
239        }
240        Op::Delete => {
241            if tokens.len() < 2 {
242                return Err(Error::client("missing key"));
243            }
244            let mut reply = ReplyMode::Always;
245            if tokens.len() > 2 && tokens.last().map(|t| t.as_ref()) == Some(b"noreply") {
246                reply = ReplyMode::SuppressSuccess;
247                if tokens.len() != 3 {
248                    return Err(Error::client("invalid arguments"));
249                }
250            } else if tokens.len() != 2 {
251                return Err(Error::client("invalid arguments"));
252            }
253            let key = tokens[1].clone();
254            validate_key(&key)?;
255            req.key = Some(key);
256            meta.reply = reply;
257        }
258        Op::Flush => {
259            let mut reply = ReplyMode::Always;
260            let mut delay: Option<i64> = None;
261            match tokens.len() {
262                1 => {}
263                2 => {
264                    if tokens[1].as_ref() == b"noreply" {
265                        reply = ReplyMode::SuppressSuccess;
266                    } else {
267                        delay = Some(
268                            parse_i64(tokens[1].as_ref())
269                                .ok_or_else(|| Error::client("invalid exptime"))?,
270                        );
271                    }
272                }
273                3 => {
274                    if tokens[2].as_ref() != b"noreply" {
275                        return Err(Error::client("invalid arguments"));
276                    }
277                    reply = ReplyMode::SuppressSuccess;
278                    delay = Some(
279                        parse_i64(tokens[1].as_ref())
280                            .ok_or_else(|| Error::client("invalid exptime"))?,
281                    );
282                }
283                _ => return Err(Error::client("invalid arguments")),
284            }
285            if let Some(value) = delay {
286                if value < 0 {
287                    return Err(Error::client("invalid exptime"));
288                }
289                req.exptime = Some(value);
290            }
291            meta.reply = reply;
292        }
293        Op::Incr | Op::Decr => {
294            if tokens.len() < 3 {
295                return Err(Error::client("missing arguments"));
296            }
297            let mut reply = ReplyMode::Always;
298            if tokens.len() > 3 && tokens.last().map(|t| t.as_ref()) == Some(b"noreply") {
299                reply = ReplyMode::SuppressSuccess;
300                if tokens.len() != 4 {
301                    return Err(Error::client("invalid arguments"));
302                }
303            } else if tokens.len() != 3 {
304                return Err(Error::client("invalid arguments"));
305            }
306            let key = tokens[1].clone();
307            validate_key(&key)?;
308            req.key = Some(key);
309            req.delta =
310                Some(parse_u64(tokens[2].as_ref()).ok_or_else(|| Error::client("invalid delta"))?);
311            meta.reply = reply;
312        }
313        Op::Touch => {
314            if tokens.len() < 3 {
315                return Err(Error::client("missing arguments"));
316            }
317            let mut reply = ReplyMode::Always;
318            if tokens.len() > 3 && tokens.last().map(|t| t.as_ref()) == Some(b"noreply") {
319                reply = ReplyMode::SuppressSuccess;
320                if tokens.len() != 4 {
321                    return Err(Error::client("invalid arguments"));
322                }
323            } else if tokens.len() != 3 {
324                return Err(Error::client("invalid arguments"));
325            }
326            let key = tokens[1].clone();
327            validate_key(&key)?;
328            req.key = Some(key);
329            req.exptime = Some(
330                parse_i64(tokens[2].as_ref()).ok_or_else(|| Error::client("invalid exptime"))?,
331            );
332            meta.reply = reply;
333        }
334        Op::Stats => {
335            if tokens.len() > 1 {
336                let keys = parse_keys(&tokens[1..])?;
337                req.keys = keys;
338            }
339        }
340        Op::Version | Op::Quit => {
341            if tokens.len() != 1 {
342                return Err(Error::client("invalid arguments"));
343            }
344        }
345        Op::MetaGet
346        | Op::MetaSet
347        | Op::MetaDelete
348        | Op::MetaArithmetic
349        | Op::MetaDebug
350        | Op::MetaNoop => {
351            meta.protocol = Protocol::Meta;
352            if op == Op::MetaNoop {
353                if tokens.len() != 1 {
354                    return Err(Error::client("invalid arguments"));
355                }
356                return Ok(LineParse::Line(req, meta));
357            }
358            if tokens.len() < 2 {
359                return Err(Error::client("missing key"));
360            }
361            let mut idx = 2;
362            let mut datalen = None;
363            if op == Op::MetaSet {
364                if tokens.len() < 3 {
365                    return Err(Error::client("missing data length"));
366                }
367                datalen = Some(
368                    parse_usize(tokens[2].as_ref())
369                        .ok_or_else(|| Error::client("invalid data length"))?,
370                );
371                idx = 3;
372            }
373            let meta_flags = parse_meta_flags(&tokens[idx..]);
374            if meta_flags.has(b'q') {
375                meta.reply = if op == Op::MetaGet {
376                    ReplyMode::SuppressMiss
377                } else {
378                    ReplyMode::SuppressSuccess
379                };
380            }
381            let mut key = tokens[1].clone();
382            if meta_flags.has(b'b') {
383                let decoded = decode_base64(key.as_ref())?;
384                if decoded.len() > 250 {
385                    return Err(Error::client("key too long"));
386                }
387                key = Bytes::from(decoded);
388            } else {
389                validate_key(&key)?;
390            }
391            req.key = Some(key);
392            if let Some(delta) = meta_flags
393                .token(b'D')
394                .and_then(|token| parse_u64(token.as_ref()))
395            {
396                req.delta = Some(delta);
397            }
398            if let Some(initial) = meta_flags
399                .token(b'J')
400                .and_then(|token| parse_u64(token.as_ref()))
401            {
402                req.initial = Some(initial);
403            }
404            if let Some(cas) = meta_flags
405                .token(b'C')
406                .and_then(|token| parse_u64(token.as_ref()))
407            {
408                req.cas = Some(cas);
409            }
410            req.meta = Some(meta_flags.clone());
411            if op == Op::MetaSet {
412                let mut length = datalen.unwrap_or(0);
413                if let Some(slen) = meta_flags
414                    .token(b'S')
415                    .and_then(|token| parse_usize(token.as_ref()))
416                {
417                    length = slen;
418                }
419                if length > max_blob_len {
420                    return Err(Error::server("value too large"));
421                }
422                if length == 0 {
423                    return Ok(LineParse::Line(req, meta));
424                }
425                return Ok(LineParse::Blob {
426                    len: length,
427                    req,
428                    meta,
429                });
430            }
431        }
432        Op::SaslListMechs | Op::SaslAuth | Op::SaslStep | Op::Unknown => {
433            req.op = Op::Unknown;
434        }
435        Op::Noop => {}
436    }
437
438    Ok(LineParse::Line(req, meta))
439}
440
441fn parse_op(cmd: &[u8]) -> Op {
442    match cmd {
443        b"get" => Op::Get,
444        b"gets" => Op::Gets,
445        b"gat" => Op::Gat,
446        b"gats" => Op::Gats,
447        b"set" => Op::Set,
448        b"add" => Op::Add,
449        b"replace" => Op::Replace,
450        b"append" => Op::Append,
451        b"prepend" => Op::Prepend,
452        b"cas" => Op::Cas,
453        b"delete" => Op::Delete,
454        b"flush_all" => Op::Flush,
455        b"incr" => Op::Incr,
456        b"decr" => Op::Decr,
457        b"touch" => Op::Touch,
458        b"stats" => Op::Stats,
459        b"version" => Op::Version,
460        b"quit" => Op::Quit,
461        b"mg" => Op::MetaGet,
462        b"ms" => Op::MetaSet,
463        b"md" => Op::MetaDelete,
464        b"ma" => Op::MetaArithmetic,
465        b"me" => Op::MetaDebug,
466        b"mn" => Op::MetaNoop,
467        _ => Op::Unknown,
468    }
469}
470
471fn parse_keys(tokens: &[Bytes]) -> Result<Vec<Bytes>, Error> {
472    let mut keys = Vec::with_capacity(tokens.len());
473    for token in tokens {
474        validate_key(token)?;
475        keys.push(token.clone());
476    }
477    Ok(keys)
478}
479
480fn validate_key(key: &Bytes) -> Result<(), Error> {
481    if key.is_empty() {
482        return Err(Error::client("empty key"));
483    }
484    if key.len() > 250 {
485        return Err(Error::client("key too long"));
486    }
487    for &b in key.as_ref() {
488        if b <= b' ' || b == 0x7f {
489            return Err(Error::client("invalid key"));
490        }
491    }
492    Ok(())
493}
494
495fn split_tokens(line: &Bytes) -> Vec<Bytes> {
496    let mut tokens = Vec::new();
497    let mut start = 0;
498    let bytes = line.as_ref();
499    while start < bytes.len() {
500        while start < bytes.len() && bytes[start] == b' ' {
501            start += 1;
502        }
503        if start >= bytes.len() {
504            break;
505        }
506        let mut end = start;
507        while end < bytes.len() && bytes[end] != b' ' {
508            end += 1;
509        }
510        tokens.push(line.slice(start..end));
511        start = end;
512    }
513    tokens
514}
515
516fn find_crlf(buf: &BytesMut) -> Option<usize> {
517    let bytes = buf.as_ref();
518    if bytes.len() < 2 {
519        return None;
520    }
521    let mut i = 0;
522    while i + 1 < bytes.len() {
523        if bytes[i] == b'\r' && bytes[i + 1] == b'\n' {
524            return Some(i);
525        }
526        i += 1;
527    }
528    None
529}
530
531fn parse_u32(token: &[u8]) -> Option<u32> {
532    parse_u64(token).and_then(|value| u32::try_from(value).ok())
533}
534
535fn parse_usize(token: &[u8]) -> Option<usize> {
536    parse_u64(token).and_then(|value| usize::try_from(value).ok())
537}
538
539fn parse_u64(token: &[u8]) -> Option<u64> {
540    if token.is_empty() {
541        return None;
542    }
543    let mut value: u64 = 0;
544    for &b in token {
545        if !b.is_ascii_digit() {
546            return None;
547        }
548        value = value.checked_mul(10)?;
549        value = value.checked_add((b - b'0') as u64)?;
550    }
551    Some(value)
552}
553
554fn parse_i64(token: &[u8]) -> Option<i64> {
555    if token.is_empty() {
556        return None;
557    }
558    let (neg, rest) = if token[0] == b'-' {
559        (true, &token[1..])
560    } else {
561        (false, token)
562    };
563    let value = parse_u64(rest)? as i64;
564    if neg { Some(-value) } else { Some(value) }
565}
566
567fn parse_meta_flags(tokens: &[Bytes]) -> MetaFlags {
568    let mut ordered = Vec::with_capacity(tokens.len());
569    for token in tokens {
570        if token.is_empty() {
571            continue;
572        }
573        let code = token.as_ref()[0];
574        let rest = if token.len() > 1 {
575            Some(token.slice(1..))
576        } else {
577            None
578        };
579        ordered.push(crate::types::MetaFlag { code, token: rest });
580    }
581    MetaFlags::new(ordered)
582}
583
584fn decode_base64(input: &[u8]) -> Result<Vec<u8>, Error> {
585    if !input.len().is_multiple_of(4) {
586        return Err(Error::client("invalid base64"));
587    }
588    let mut out = Vec::with_capacity(input.len() / 4 * 3);
589    let mut i = 0;
590    while i < input.len() {
591        let a = decode_base64_val(input[i])?;
592        let b = decode_base64_val(input[i + 1])?;
593        let c = input[i + 2];
594        let d = input[i + 3];
595        let c_val = if c == b'=' {
596            None
597        } else {
598            Some(decode_base64_val(c)?)
599        };
600        let d_val = if d == b'=' {
601            None
602        } else {
603            Some(decode_base64_val(d)?)
604        };
605
606        out.push((a << 2) | (b >> 4));
607        if let Some(c_val) = c_val {
608            out.push((b << 4) | (c_val >> 2));
609            if let Some(d_val) = d_val {
610                out.push((c_val << 6) | d_val);
611            }
612        }
613        i += 4;
614    }
615    Ok(out)
616}
617
618fn decode_base64_val(byte: u8) -> Result<u8, Error> {
619    match byte {
620        b'A'..=b'Z' => Ok(byte - b'A'),
621        b'a'..=b'z' => Ok(byte - b'a' + 26),
622        b'0'..=b'9' => Ok(byte - b'0' + 52),
623        b'+' => Ok(62),
624        b'/' => Ok(63),
625        b'=' => Ok(0),
626        _ => Err(Error::client("invalid base64")),
627    }
628}
629
630pub fn should_suppress_ascii(meta: RequestMeta, response: &Response) -> bool {
631    match meta.reply {
632        ReplyMode::Always => false,
633        ReplyMode::SuppressMiss => {
634            if let Response::Meta(meta_resp) = response {
635                meta_resp.code.is_miss()
636            } else {
637                false
638            }
639        }
640        ReplyMode::SuppressSuccess => !matches!(response, Response::Error(_)),
641        ReplyMode::QuietBuffered => false,
642    }
643}
644
645pub fn should_suppress_meta(meta: RequestMeta, response: &Response) -> bool {
646    match meta.reply {
647        ReplyMode::Always => false,
648        ReplyMode::SuppressMiss => {
649            if let Response::Meta(meta_resp) = response {
650                meta_resp.code.is_miss()
651            } else {
652                false
653            }
654        }
655        ReplyMode::SuppressSuccess => {
656            if let Response::Meta(meta_resp) = response {
657                meta_resp.code.is_success()
658            } else {
659                false
660            }
661        }
662        ReplyMode::QuietBuffered => false,
663    }
664}
665
666pub fn encode_ascii_response(
667    req: &Request,
668    meta: RequestMeta,
669    response: &Response,
670    out: &mut BytesMut,
671) -> bool {
672    if should_suppress_ascii(meta, response) {
673        return false;
674    }
675    match response {
676        Response::Stored => out.extend_from_slice(b"STORED\r\n"),
677        Response::NotStored => out.extend_from_slice(b"NOT_STORED\r\n"),
678        Response::Exists => out.extend_from_slice(b"EXISTS\r\n"),
679        Response::NotFound => out.extend_from_slice(b"NOT_FOUND\r\n"),
680        Response::Deleted => out.extend_from_slice(b"DELETED\r\n"),
681        Response::Touched => out.extend_from_slice(b"TOUCHED\r\n"),
682        Response::Ok => out.extend_from_slice(b"OK\r\n"),
683        Response::Numeric(value) => {
684            write_u64(out, *value);
685            out.extend_from_slice(b"\r\n");
686        }
687        Response::Value(entry) => {
688            let include_cas = matches!(req.op, Op::Gets | Op::Gats);
689            encode_value_entry(entry, include_cas, out);
690            out.extend_from_slice(b"END\r\n");
691        }
692        Response::Values(entries) => {
693            let include_cas = matches!(req.op, Op::Gets | Op::Gats);
694            for entry in entries {
695                encode_value_entry(entry, include_cas, out);
696            }
697            out.extend_from_slice(b"END\r\n");
698        }
699        Response::Stats(lines) => {
700            for line in lines {
701                encode_stat_line(line, out);
702            }
703            out.extend_from_slice(b"END\r\n");
704        }
705        Response::Version(version) => {
706            out.extend_from_slice(b"VERSION ");
707            out.extend_from_slice(version);
708            out.extend_from_slice(b"\r\n");
709        }
710        Response::Noop => {
711            if req.op == Op::MetaNoop {
712                out.extend_from_slice(b"MN\r\n");
713            }
714        }
715        Response::Error(err) => match err.kind {
716            ErrorKind::UnknownCommand => out.extend_from_slice(b"ERROR\r\n"),
717            ErrorKind::Client | ErrorKind::Auth => {
718                out.extend_from_slice(b"CLIENT_ERROR ");
719                out.extend_from_slice(err.message.as_ref());
720                out.extend_from_slice(b"\r\n");
721            }
722            ErrorKind::Server => {
723                out.extend_from_slice(b"SERVER_ERROR ");
724                out.extend_from_slice(err.message.as_ref());
725                out.extend_from_slice(b"\r\n");
726            }
727        },
728        Response::Meta(_) | Response::ValuesStream(_) | Response::StatsStream(_) => {}
729    }
730    true
731}
732
733pub fn encode_meta_response(
734    req: &Request,
735    meta: RequestMeta,
736    response: &MetaResponse,
737    out: &mut BytesMut,
738) {
739    if should_suppress_meta(meta, &Response::Meta(response.clone())) {
740        return;
741    }
742    out.extend_from_slice(response.code.as_bytes());
743    if response.code == MetaCode::Va {
744        out.extend_from_slice(b" ");
745        let size = response
746            .size
747            .or_else(|| response.value.as_ref().map(|value| value.len()))
748            .unwrap_or(0);
749        write_usize(out, size);
750    }
751
752    if let Some(flags) = req.meta.as_ref() {
753        for flag in &flags.ordered {
754            append_meta_token(req, response, flag.code, out);
755        }
756    }
757
758    if let Some(win) = response.extra.won {
759        out.extend_from_slice(match win {
760            crate::types::WinState::Won => b" W",
761            crate::types::WinState::AlreadyWon => b" Z",
762        });
763    }
764    if response.extra.stale {
765        out.extend_from_slice(b" X");
766    }
767
768    out.extend_from_slice(b"\r\n");
769
770    let want_value = req
771        .meta
772        .as_ref()
773        .map(|flags| flags.has(b'v'))
774        .unwrap_or(false);
775    if want_value && let Some(value) = response.value.as_ref() {
776        out.extend_from_slice(value);
777        out.extend_from_slice(b"\r\n");
778    }
779}
780
781pub fn encode_meta_debug(
782    req: &Request,
783    lines: impl IntoIterator<Item = StatLine>,
784    out: &mut BytesMut,
785) {
786    let key = match req.key.as_ref() {
787        Some(key) => key.as_ref(),
788        None => b"",
789    };
790    out.extend_from_slice(b"ME ");
791    out.extend_from_slice(key);
792    for line in lines {
793        out.extend_from_slice(b" ");
794        out.extend_from_slice(line.key.as_ref());
795        out.extend_from_slice(b"=");
796        out.extend_from_slice(line.value.as_ref());
797    }
798    out.extend_from_slice(b"\r\n");
799}
800
801pub fn encode_value_entry(entry: &ValueEntry, include_cas: bool, out: &mut BytesMut) {
802    out.extend_from_slice(b"VALUE ");
803    out.extend_from_slice(entry.key.as_ref());
804    out.extend_from_slice(b" ");
805    write_u32(out, entry.flags);
806    out.extend_from_slice(b" ");
807    write_usize(out, entry.value.len());
808    if include_cas {
809        out.extend_from_slice(b" ");
810        write_u64(out, entry.cas.unwrap_or(0));
811    }
812    out.extend_from_slice(b"\r\n");
813    out.extend_from_slice(entry.value.as_ref());
814    out.extend_from_slice(b"\r\n");
815}
816
817pub fn encode_stat_line(line: &StatLine, out: &mut BytesMut) {
818    out.extend_from_slice(b"STAT ");
819    out.extend_from_slice(line.key.as_ref());
820    out.extend_from_slice(b" ");
821    out.extend_from_slice(line.value.as_ref());
822    out.extend_from_slice(b"\r\n");
823}
824
825fn append_meta_token(req: &Request, response: &MetaResponse, code: u8, out: &mut BytesMut) {
826    match code {
827        b'O' => {
828            if let Some(token) = req.meta.as_ref().and_then(|flags| flags.token(b'O')) {
829                out.extend_from_slice(b" O");
830                out.extend_from_slice(token.as_ref());
831            }
832        }
833        b'k' => {
834            if let Some(key) = req.key.as_ref() {
835                out.extend_from_slice(b" k");
836                out.extend_from_slice(key.as_ref());
837            }
838        }
839        b'c' => {
840            if let Some(cas) = response.cas {
841                out.extend_from_slice(b" c");
842                write_u64(out, cas);
843            }
844        }
845        b't' => {
846            if let Some(ttl) = response.ttl {
847                out.extend_from_slice(b" t");
848                write_i64(out, ttl);
849            }
850        }
851        b'f' => {
852            if let Some(flags) = response.flags {
853                out.extend_from_slice(b" f");
854                write_u32(out, flags);
855            }
856        }
857        b's' => {
858            if let Some(size) = response.size {
859                out.extend_from_slice(b" s");
860                write_usize(out, size);
861            }
862        }
863        b'h' => {
864            if let Some(hit) = response.hit {
865                out.extend_from_slice(b" h");
866                out.extend_from_slice(if hit { b"1" } else { b"0" });
867            }
868        }
869        b'l' => {
870            if let Some(last) = response.last_access {
871                out.extend_from_slice(b" l");
872                write_u64(out, last);
873            }
874        }
875        _ => {}
876    }
877}
878
879fn write_u32(out: &mut BytesMut, value: u32) {
880    write_u64(out, value as u64)
881}
882
883fn write_usize(out: &mut BytesMut, value: usize) {
884    write_u64(out, value as u64)
885}
886
887fn write_u64(out: &mut BytesMut, mut value: u64) {
888    let mut buf = [0u8; 20];
889    let mut i = buf.len();
890    if value == 0 {
891        out.put_u8(b'0');
892        return;
893    }
894    while value > 0 {
895        i -= 1;
896        buf[i] = b'0' + (value % 10) as u8;
897        value /= 10;
898    }
899    out.extend_from_slice(&buf[i..]);
900}
901
902fn write_i64(out: &mut BytesMut, value: i64) {
903    if value < 0 {
904        out.put_u8(b'-');
905        write_u64(out, (-value) as u64);
906    } else {
907        write_u64(out, value as u64);
908    }
909}
910
911#[cfg(test)]
912mod tests {
913    use super::*;
914    use bytes::BytesMut;
915
916    #[test]
917    fn decode_set_with_value() {
918        let mut decoder = AsciiDecoder::new();
919        let mut buf = BytesMut::from("set key 1 10 5\r\nhello\r\n");
920        let limits = AsciiLimits::default();
921
922        let outcome = decoder.decode(&mut buf, limits);
923        let (req, meta) = match outcome {
924            Some(DecodeOutcome::Request(req, meta)) => (req, meta),
925            _ => panic!("unexpected decode outcome"),
926        };
927        assert_eq!(req.op, Op::Set);
928        assert_eq!(req.value.unwrap(), Bytes::from_static(b"hello"));
929        assert_eq!(meta.reply, ReplyMode::Always);
930    }
931
932    #[test]
933    fn decode_multi_get() {
934        let mut decoder = AsciiDecoder::new();
935        let mut buf = BytesMut::from("get k1 k2\r\n");
936        let outcome = decoder.decode(&mut buf, AsciiLimits::default());
937        let (req, _) = match outcome {
938            Some(DecodeOutcome::Request(req, meta)) => (req, meta),
939            _ => panic!("unexpected decode outcome"),
940        };
941        assert_eq!(req.op, Op::Get);
942        assert_eq!(req.keys.len(), 2);
943    }
944
945    #[test]
946    fn decode_meta_set_s_token() {
947        let mut decoder = AsciiDecoder::new();
948        let mut buf = BytesMut::from("ms key 5 S3\r\nabc\r\n");
949        let limits = AsciiLimits::default();
950
951        let outcome = decoder.decode(&mut buf, limits);
952        let (req, meta) = match outcome {
953            Some(DecodeOutcome::Request(req, meta)) => (req, meta),
954            _ => panic!("unexpected decode outcome"),
955        };
956        assert_eq!(meta.protocol, Protocol::Meta);
957        assert_eq!(req.value.unwrap(), Bytes::from_static(b"abc"));
958    }
959
960    #[test]
961    fn decode_flush_all_delay_noreply() {
962        let mut decoder = AsciiDecoder::new();
963        let mut buf = BytesMut::from("flush_all 10 noreply\r\n");
964        let outcome = decoder.decode(&mut buf, AsciiLimits::default());
965        let (req, meta) = match outcome {
966            Some(DecodeOutcome::Request(req, meta)) => (req, meta),
967            _ => panic!("unexpected decode outcome"),
968        };
969        assert_eq!(req.op, Op::Flush);
970        assert_eq!(req.exptime, Some(10));
971        assert_eq!(meta.reply, ReplyMode::SuppressSuccess);
972    }
973}