mtop_client/
core.rs

1use std::borrow::Borrow;
2use std::cmp::Ordering;
3use std::collections::{BTreeSet, HashMap};
4use std::error;
5use std::fmt;
6use std::io;
7use std::ops::Deref;
8use std::str::FromStr;
9use std::time::Duration;
10use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter, Lines};
11
12#[derive(Debug, Default, PartialEq, Clone)]
13pub struct Stats {
14    // Server info
15    pub pid: i64,
16    pub uptime: u64,
17    pub server_time: i64,
18    pub threads: u64,
19    pub version: String,
20
21    // CPU
22    pub rusage_user: f64,
23    pub rusage_system: f64,
24
25    // Connections
26    pub max_connections: u64,
27    pub curr_connections: u64,
28    pub total_connections: u64,
29    pub rejected_connections: u64,
30
31    // Commands
32    pub cmd_get: u64,
33    pub cmd_set: u64,
34    pub cmd_flush: u64,
35    pub cmd_touch: u64,
36    pub cmd_meta: u64,
37
38    // Gets
39    pub get_hits: u64,
40    pub get_misses: u64,
41    pub get_expired: u64,
42    pub get_flushed: u64,
43
44    // Sets
45    pub store_too_large: u64,
46    pub store_no_memory: u64,
47
48    // Deletes
49    pub delete_hits: u64,
50    pub delete_misses: u64,
51
52    // Incr/Decr
53    pub incr_hits: u64,
54    pub incr_misses: u64,
55    pub decr_hits: u64,
56    pub decr_misses: u64,
57
58    // Touches
59    pub touch_hits: u64,
60    pub touch_misses: u64,
61
62    // Bytes
63    pub bytes_read: u64,
64    pub bytes_written: u64,
65    pub bytes: u64,
66    pub max_bytes: u64,
67
68    // Items
69    pub curr_items: u64,
70    pub total_items: u64,
71    pub evictions: u64,
72}
73
74impl TryFrom<&HashMap<String, String>> for Stats {
75    type Error = MtopError;
76
77    fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
78        Ok(Stats {
79            pid: parse_field("pid", value)?,
80            uptime: parse_field("uptime", value)?,
81            server_time: parse_field("time", value)?,
82            version: parse_field("version", value)?,
83            threads: parse_field("threads", value)?,
84
85            rusage_user: parse_field("rusage_user", value)?,
86            rusage_system: parse_field("rusage_system", value)?,
87
88            max_connections: parse_field("max_connections", value)?,
89            curr_connections: parse_field("curr_connections", value)?,
90            total_connections: parse_field("total_connections", value)?,
91            rejected_connections: parse_field("rejected_connections", value)?,
92
93            cmd_get: parse_field("cmd_get", value)?,
94            cmd_set: parse_field("cmd_set", value)?,
95            cmd_flush: parse_field("cmd_flush", value)?,
96            cmd_touch: parse_field("cmd_touch", value)?,
97            cmd_meta: parse_field("cmd_meta", value)?,
98
99            get_hits: parse_field("get_hits", value)?,
100            get_misses: parse_field("get_misses", value)?,
101            get_expired: parse_field("get_expired", value)?,
102            get_flushed: parse_field("get_flushed", value)?,
103
104            store_too_large: parse_field("store_too_large", value)?,
105            store_no_memory: parse_field("store_no_memory", value)?,
106
107            delete_hits: parse_field("delete_hits", value)?,
108            delete_misses: parse_field("delete_misses", value)?,
109
110            incr_hits: parse_field("incr_hits", value)?,
111            incr_misses: parse_field("incr_misses", value)?,
112
113            decr_hits: parse_field("decr_hits", value)?,
114            decr_misses: parse_field("decr_misses", value)?,
115
116            touch_hits: parse_field("touch_hits", value)?,
117            touch_misses: parse_field("touch_misses", value)?,
118
119            bytes_read: parse_field("bytes_read", value)?,
120            bytes_written: parse_field("bytes_written", value)?,
121            bytes: parse_field("bytes", value)?,
122            max_bytes: parse_field("limit_maxbytes", value)?,
123
124            curr_items: parse_field("curr_items", value)?,
125            total_items: parse_field("total_items", value)?,
126            evictions: parse_field("evictions", value)?,
127        })
128    }
129}
130
131#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
132pub struct Slab {
133    pub id: u64,
134    pub chunk_size: u64,
135    pub chunks_per_page: u64,
136    pub total_pages: u64,
137    pub total_chunks: u64,
138    pub used_chunks: u64,
139    pub free_chunks: u64,
140    pub get_hits: u64,
141    pub cmd_set: u64,
142    pub delete_hits: u64,
143    pub incr_hits: u64,
144    pub decr_hits: u64,
145    pub cas_hits: u64,
146    pub cas_badval: u64,
147    pub touch_hits: u64,
148}
149
150impl PartialOrd for Slab {
151    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
152        Some(self.cmp(other))
153    }
154}
155
156impl Ord for Slab {
157    fn cmp(&self, other: &Self) -> Ordering {
158        self.id.cmp(&other.id)
159    }
160}
161
162#[derive(Debug, Default, PartialEq, Eq, Clone)]
163#[repr(transparent)]
164pub struct Slabs {
165    slabs: Vec<Slab>,
166}
167
168impl Slabs {
169    pub fn iter(&self) -> impl ExactSizeIterator<Item = &Slab> {
170        self.slabs.iter()
171    }
172
173    pub fn len(&self) -> usize {
174        self.slabs.len()
175    }
176
177    pub fn is_empty(&self) -> bool {
178        self.slabs.is_empty()
179    }
180
181    pub fn find_for_size(&self, size: u64) -> Option<&Slab> {
182        // Find the slab with an appropriate chunk size for an item with the given
183        // size. If there is no slab with a chunk size that fits the item, return the
184        // last (hence largest) slab class since this is what Memcached does internally.
185        self.slabs
186            .get(self.slabs.partition_point(|s| s.chunk_size < size))
187            .or_else(|| self.slabs.last())
188    }
189}
190
191impl IntoIterator for Slabs {
192    type Item = Slab;
193    type IntoIter = std::vec::IntoIter<Slab>;
194
195    fn into_iter(self) -> Self::IntoIter {
196        self.slabs.into_iter()
197    }
198}
199
200impl TryFrom<&HashMap<String, String>> for Slabs {
201    type Error = MtopError;
202
203    fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
204        // Parse the slab IDs from each of the raw stats. We have to do this because
205        // Memcached isn't guaranteed to use a particular slab ID if there are no items
206        // to store in that size class. Otherwise, we could just loop from one to
207        // $active_slabs + 1.
208        let mut ids = BTreeSet::new();
209        for k in value.keys() {
210            let key_id: Option<u64> = k.split_once(':').map(|(raw, _rest)| raw).and_then(|raw| raw.parse().ok());
211
212            if let Some(id) = key_id {
213                ids.insert(id);
214            }
215        }
216
217        let mut slabs = Vec::with_capacity(ids.len());
218
219        for id in ids {
220            slabs.push(Slab {
221                id,
222                chunk_size: parse_field(&format!("{}:chunk_size", id), value)?,
223                chunks_per_page: parse_field(&format!("{}:chunks_per_page", id), value)?,
224                total_pages: parse_field(&format!("{}:total_pages", id), value)?,
225                total_chunks: parse_field(&format!("{}:total_chunks", id), value)?,
226                used_chunks: parse_field(&format!("{}:used_chunks", id), value)?,
227                free_chunks: parse_field(&format!("{}:free_chunks", id), value)?,
228                get_hits: parse_field(&format!("{}:get_hits", id), value)?,
229                cmd_set: parse_field(&format!("{}:cmd_set", id), value)?,
230                delete_hits: parse_field(&format!("{}:delete_hits", id), value)?,
231                incr_hits: parse_field(&format!("{}:incr_hits", id), value)?,
232                decr_hits: parse_field(&format!("{}:decr_hits", id), value)?,
233                cas_hits: parse_field(&format!("{}:cas_hits", id), value)?,
234                cas_badval: parse_field(&format!("{}:cas_badval", id), value)?,
235                touch_hits: parse_field(&format!("{}:touch_hits", id), value)?,
236            })
237        }
238
239        Ok(Self { slabs })
240    }
241}
242
243#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
244pub struct SlabItem {
245    pub id: u64,
246    pub number: u64,
247    pub number_hot: u64,
248    pub number_warm: u64,
249    pub number_cold: u64,
250    pub age_hot: u64,
251    pub age_warm: u64,
252    pub age: u64,
253    pub mem_requested: u64,
254    pub evicted: u64,
255    pub evicted_nonzero: u64,
256    pub evicted_time: u64,
257    pub out_of_memory: u64,
258    pub tail_repairs: u64,
259    pub reclaimed: u64,
260    pub expired_unfetched: u64,
261    pub evicted_unfetched: u64,
262    pub evicted_active: u64,
263    pub crawler_reclaimed: u64,
264    pub crawler_items_checked: u64,
265    pub lrutail_reflocked: u64,
266    pub moves_to_cold: u64,
267    pub moves_to_warm: u64,
268    pub moves_within_lru: u64,
269    pub direct_reclaims: u64,
270    pub hits_to_hot: u64,
271    pub hits_to_warm: u64,
272    pub hits_to_cold: u64,
273    pub hits_to_temp: u64,
274}
275
276impl PartialOrd for SlabItem {
277    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
278        Some(self.cmp(other))
279    }
280}
281
282impl Ord for SlabItem {
283    fn cmp(&self, other: &Self) -> Ordering {
284        self.id.cmp(&other.id)
285    }
286}
287
288#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
289#[repr(transparent)]
290pub struct SlabItems {
291    items: Vec<SlabItem>,
292}
293
294impl SlabItems {
295    pub fn iter(&self) -> impl ExactSizeIterator<Item = &SlabItem> {
296        self.items.iter()
297    }
298
299    pub fn len(&self) -> usize {
300        self.items.len()
301    }
302
303    pub fn is_empty(&self) -> bool {
304        self.items.is_empty()
305    }
306}
307
308impl IntoIterator for SlabItems {
309    type Item = SlabItem;
310    type IntoIter = std::vec::IntoIter<SlabItem>;
311
312    fn into_iter(self) -> Self::IntoIter {
313        self.items.into_iter()
314    }
315}
316
317impl TryFrom<&HashMap<String, String>> for SlabItems {
318    type Error = MtopError;
319
320    fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
321        // Parse the slab IDs from each of the raw stats. We have to do this because
322        // Memcached isn't guaranteed to use a particular slab ID if there are no items
323        // to store in that size class. Otherwise, we could just loop from one to
324        // $active_slabs + 1.
325        let mut ids = BTreeSet::new();
326        for k in value.keys() {
327            let key_id: Option<u64> = k
328                .trim_start_matches("items:")
329                .split_once(':')
330                .map(|(raw, _rest)| raw)
331                .and_then(|raw| raw.parse().ok());
332
333            if let Some(id) = key_id {
334                ids.insert(id);
335            }
336        }
337
338        let mut items = Vec::with_capacity(ids.len());
339
340        for id in ids {
341            items.push(SlabItem {
342                id,
343                number: parse_field(&format!("items:{}:number", id), value)?,
344                number_hot: parse_field(&format!("items:{}:number_hot", id), value)?,
345                number_warm: parse_field(&format!("items:{}:number_warm", id), value)?,
346                number_cold: parse_field(&format!("items:{}:number_cold", id), value)?,
347                age_hot: parse_field(&format!("items:{}:age_hot", id), value)?,
348                age_warm: parse_field(&format!("items:{}:age_warm", id), value)?,
349                age: parse_field(&format!("items:{}:age", id), value)?,
350                mem_requested: parse_field(&format!("items:{}:mem_requested", id), value)?,
351                evicted: parse_field(&format!("items:{}:evicted", id), value)?,
352                evicted_nonzero: parse_field(&format!("items:{}:evicted_nonzero", id), value)?,
353                evicted_time: parse_field(&format!("items:{}:evicted_time", id), value)?,
354                out_of_memory: parse_field(&format!("items:{}:outofmemory", id), value)?,
355                tail_repairs: parse_field(&format!("items:{}:tailrepairs", id), value)?,
356                reclaimed: parse_field(&format!("items:{}:reclaimed", id), value)?,
357                expired_unfetched: parse_field(&format!("items:{}:expired_unfetched", id), value)?,
358                evicted_unfetched: parse_field(&format!("items:{}:evicted_unfetched", id), value)?,
359                evicted_active: parse_field(&format!("items:{}:evicted_active", id), value)?,
360                crawler_reclaimed: parse_field(&format!("items:{}:crawler_reclaimed", id), value)?,
361                crawler_items_checked: parse_field(&format!("items:{}:crawler_items_checked", id), value)?,
362                lrutail_reflocked: parse_field(&format!("items:{}:lrutail_reflocked", id), value)?,
363                moves_to_cold: parse_field(&format!("items:{}:moves_to_cold", id), value)?,
364                moves_to_warm: parse_field(&format!("items:{}:moves_to_warm", id), value)?,
365                moves_within_lru: parse_field(&format!("items:{}:moves_within_lru", id), value)?,
366                direct_reclaims: parse_field(&format!("items:{}:direct_reclaims", id), value)?,
367                hits_to_hot: parse_field(&format!("items:{}:hits_to_hot", id), value)?,
368                hits_to_warm: parse_field(&format!("items:{}:hits_to_warm", id), value)?,
369                hits_to_cold: parse_field(&format!("items:{}:hits_to_cold", id), value)?,
370                hits_to_temp: parse_field(&format!("items:{}:hits_to_temp", id), value)?,
371            })
372        }
373
374        Ok(Self { items })
375    }
376}
377
378#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
379pub struct Meta {
380    pub key: String,
381    pub expires: i64,
382    pub size: u64,
383}
384
385impl Meta {
386    // Meta information is returned from the server as multiple key-value pairs per
387    // line. We only care about a subset of those keys. Define them here to avoid doing
388    // extra work when parsing the server response.
389    const KEYS: &'static [&'static str] = &["key", "exp", "size"];
390}
391
392impl TryFrom<&HashMap<String, String>> for Meta {
393    type Error = MtopError;
394
395    fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
396        Ok(Meta {
397            key: parse_field("key", value)?,
398            expires: parse_field("exp", value)?,
399            size: parse_field("size", value)?,
400        })
401    }
402}
403
404impl PartialOrd for Meta {
405    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
406        Some(self.cmp(other))
407    }
408}
409
410impl Ord for Meta {
411    fn cmp(&self, other: &Self) -> Ordering {
412        self.key.cmp(&other.key)
413    }
414}
415
416#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
417pub struct Value {
418    pub key: String,
419    pub cas: u64,
420    pub flags: u64,
421    pub data: Vec<u8>,
422}
423
424impl PartialOrd for Value {
425    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
426        Some(self.cmp(other))
427    }
428}
429
430impl Ord for Value {
431    fn cmp(&self, other: &Self) -> Ordering {
432        self.key.cmp(&other.key)
433    }
434}
435
436fn parse_field<T>(key: &str, map: &HashMap<String, String>) -> Result<T, MtopError>
437where
438    T: FromStr,
439    <T as FromStr>::Err: fmt::Display + Send + Sync + error::Error + 'static,
440{
441    map.get(key)
442        .ok_or_else(|| MtopError::runtime(format!("field {} missing", key)))
443        .and_then(|v| {
444            v.parse()
445                .map_err(|e| MtopError::runtime_cause(format!("field {} value '{}'", key, v), e))
446        })
447}
448
449fn parse_value<T>(val: &str, line: &str) -> Result<T, MtopError>
450where
451    T: FromStr + fmt::Display,
452    <T as FromStr>::Err: fmt::Display + Send + Sync + error::Error + 'static,
453{
454    val.parse()
455        .map_err(|e| MtopError::runtime_cause(format!("parsing {} from '{}'", val, line), e))
456}
457
458#[derive(Debug, PartialOrd, PartialEq, Copy, Clone)]
459pub enum ErrorKind {
460    Runtime,
461    IO,
462    Protocol,
463    Configuration,
464}
465
466impl fmt::Display for ErrorKind {
467    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
468        match self {
469            Self::Runtime => write!(f, "runtime error"),
470            Self::IO => write!(f, "io error"),
471            Self::Protocol => write!(f, "protocol error"),
472            Self::Configuration => write!(f, "configuration error"),
473        }
474    }
475}
476
477#[derive(Debug)]
478enum ErrorRepr {
479    Message(String),
480    Cause(Box<dyn error::Error + Send + Sync + 'static>),
481    MessageCause(String, Box<dyn error::Error + Send + Sync + 'static>),
482}
483
484#[derive(Debug)]
485pub struct MtopError {
486    kind: ErrorKind,
487    repr: ErrorRepr,
488}
489
490impl MtopError {
491    pub fn runtime<S>(msg: S) -> MtopError
492    where
493        S: Into<String>,
494    {
495        MtopError {
496            kind: ErrorKind::Runtime,
497            repr: ErrorRepr::Message(msg.into()),
498        }
499    }
500
501    pub fn runtime_cause<S, E>(msg: S, e: E) -> MtopError
502    where
503        S: Into<String>,
504        E: error::Error + Send + Sync + 'static,
505    {
506        MtopError {
507            kind: ErrorKind::Runtime,
508            repr: ErrorRepr::MessageCause(msg.into(), Box::new(e)),
509        }
510    }
511
512    pub fn configuration<S>(msg: S) -> MtopError
513    where
514        S: Into<String>,
515    {
516        MtopError {
517            kind: ErrorKind::Configuration,
518            repr: ErrorRepr::Message(msg.into()),
519        }
520    }
521
522    pub fn configuration_cause<S, E>(msg: S, e: E) -> MtopError
523    where
524        S: Into<String>,
525        E: error::Error + Send + Sync + 'static,
526    {
527        MtopError {
528            kind: ErrorKind::Configuration,
529            repr: ErrorRepr::MessageCause(msg.into(), Box::new(e)),
530        }
531    }
532
533    pub fn timeout<D>(t: Duration, operation: D) -> MtopError
534    where
535        D: fmt::Display,
536    {
537        MtopError {
538            kind: ErrorKind::IO,
539            repr: ErrorRepr::Message(format!("operation {} timed out after {:?}", operation, t)),
540        }
541    }
542
543    pub fn kind(&self) -> ErrorKind {
544        self.kind
545    }
546}
547
548impl fmt::Display for MtopError {
549    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
550        match &self.repr {
551            ErrorRepr::Message(msg) => write!(f, "{}: {}", self.kind, msg),
552            ErrorRepr::Cause(e) => write!(f, "{}: {}", self.kind, e),
553            ErrorRepr::MessageCause(msg, e) => write!(f, "{}: {}: {}", self.kind, msg, e),
554        }
555    }
556}
557
558impl error::Error for MtopError {
559    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
560        match &self.repr {
561            ErrorRepr::Message(_) => None,
562            ErrorRepr::Cause(e) => Some(e.as_ref()),
563            ErrorRepr::MessageCause(_, e) => Some(e.as_ref()),
564        }
565    }
566}
567
568impl From<(String, io::Error)> for MtopError {
569    fn from((s, e): (String, io::Error)) -> Self {
570        MtopError {
571            kind: ErrorKind::IO,
572            repr: ErrorRepr::MessageCause(s, Box::new(e)),
573        }
574    }
575}
576
577impl From<io::Error> for MtopError {
578    fn from(e: io::Error) -> Self {
579        MtopError {
580            kind: ErrorKind::IO,
581            repr: ErrorRepr::Cause(Box::new(e)),
582        }
583    }
584}
585
586impl From<ProtocolError> for MtopError {
587    fn from(e: ProtocolError) -> Self {
588        MtopError {
589            kind: ErrorKind::Protocol,
590            repr: ErrorRepr::Cause(Box::new(e)),
591        }
592    }
593}
594
595#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
596pub enum ProtocolErrorKind {
597    BadClass,
598    Busy,
599    Client,
600    NotFound,
601    NotStored,
602    Server,
603    Syntax,
604}
605
606impl fmt::Display for ProtocolErrorKind {
607    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
608        match self {
609            Self::BadClass => "BADCLASS".fmt(f),
610            Self::Busy => "BUSY".fmt(f),
611            Self::Client => "CLIENT_ERROR".fmt(f),
612            Self::NotFound => "NOT_FOUND".fmt(f),
613            Self::NotStored => "NOT_STORED".fmt(f),
614            Self::Server => "SERVER_ERROR".fmt(f),
615            Self::Syntax => "ERROR".fmt(f),
616        }
617    }
618}
619
620#[derive(Debug)]
621pub struct ProtocolError {
622    kind: ProtocolErrorKind,
623    message: Option<String>,
624}
625
626impl ProtocolError {
627    pub fn kind(&self) -> ProtocolErrorKind {
628        self.kind
629    }
630}
631
632impl fmt::Display for ProtocolError {
633    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
634        if let Some(msg) = &self.message {
635            write!(f, "{} {}", self.kind, msg)
636        } else {
637            write!(f, "{}", self.kind)
638        }
639    }
640}
641
642impl error::Error for ProtocolError {}
643
644#[derive(Debug, Eq, PartialEq, Clone)]
645enum Command<'a> {
646    Add(&'a Key, u64, u32, &'a [u8]),
647    CrawlerMetadump,
648    Decr(&'a Key, u64),
649    Delete(&'a Key),
650    FlushAll,
651    FlushAllWait(u64),
652    Gets(&'a [Key]),
653    Incr(&'a Key, u64),
654    Replace(&'a Key, u64, u32, &'a [u8]),
655    Stats,
656    StatsItems,
657    StatsSlabs,
658    Set(&'a Key, u64, u32, &'a [u8]),
659    Touch(&'a Key, u32),
660    Version,
661}
662
663impl<'a> From<Command<'a>> for Vec<u8> {
664    fn from(value: Command<'a>) -> Self {
665        match value {
666            Command::Add(key, flags, ttl, data) => storage_command("add", key, flags, ttl, data),
667            Command::CrawlerMetadump => "lru_crawler metadump hash\r\n".to_owned().into_bytes(),
668            Command::Decr(key, delta) => format!("decr {} {}\r\n", key, delta).into_bytes(),
669            Command::Delete(key) => format!("delete {}\r\n", key).into_bytes(),
670            Command::FlushAll => "flush_all\r\n".to_owned().into_bytes(),
671            Command::FlushAllWait(wait) => format!("flush_all {}\r\n", wait).into_bytes(),
672            Command::Gets(keys) => format!("gets {}\r\n", keys.join(" ")).into_bytes(),
673            Command::Incr(key, delta) => format!("incr {} {}\r\n", key, delta).into_bytes(),
674            Command::Replace(key, flags, ttl, data) => storage_command("replace", key, flags, ttl, data),
675            Command::Stats => "stats\r\n".to_owned().into_bytes(),
676            Command::StatsItems => "stats items\r\n".to_owned().into_bytes(),
677            Command::StatsSlabs => "stats slabs\r\n".to_owned().into_bytes(),
678            Command::Set(key, flags, ttl, data) => storage_command("set", key, flags, ttl, data),
679            Command::Touch(key, ttl) => format!("touch {} {}\r\n", key, ttl).into_bytes(),
680            Command::Version => "version\r\n".to_owned().into_bytes(),
681        }
682    }
683}
684
685fn storage_command(verb: &str, key: &Key, flags: u64, ttl: u32, data: &[u8]) -> Vec<u8> {
686    let mut bytes = Vec::with_capacity(key.len() + data.len() + 32);
687    io::Write::write_all(
688        &mut bytes,
689        format!("{} {} {} {} {}\r\n", verb, key, flags, ttl, data.len()).as_bytes(),
690    )
691    .unwrap();
692    io::Write::write_all(&mut bytes, data).unwrap();
693    io::Write::write_all(&mut bytes, "\r\n".as_bytes()).unwrap();
694    bytes
695}
696
697pub struct Memcached {
698    read: Lines<BufReader<Box<dyn AsyncRead + Send + Sync + Unpin>>>,
699    write: BufWriter<Box<dyn AsyncWrite + Send + Sync + Unpin>>,
700}
701
702impl Memcached {
703    const MAX_PAYLOAD_SIZE: u64 = 1024 * 1024 * 1024;
704
705    pub fn new<R, W>(read: R, write: W) -> Self
706    where
707        R: AsyncRead + Send + Sync + Unpin + 'static,
708        W: AsyncWrite + Send + Sync + Unpin + 'static,
709    {
710        Memcached {
711            read: BufReader::<Box<dyn AsyncRead + Send + Sync + Unpin>>::new(Box::new(read)).lines(),
712            write: BufWriter::new(Box::new(write)),
713        }
714    }
715
716    /// Get a `Stats` object with the current values of the interesting stats for the server.
717    pub async fn stats(&mut self) -> Result<Stats, MtopError> {
718        self.send(Command::Stats).await?;
719        let raw = self.read_stats_response().await?;
720        Stats::try_from(&raw)
721    }
722
723    /// Get a `Slabs` object with information about each set of `Slab`s maintained by
724    /// the Memcached server. You can think of each `Slab` as a class of objects that
725    /// are stored together in memory. Note that `Slab` IDs may not be contiguous based
726    /// on the size of items actually stored by the server.
727    pub async fn slabs(&mut self) -> Result<Slabs, MtopError> {
728        self.send(Command::StatsSlabs).await?;
729        let raw = self.read_stats_response().await?;
730        Slabs::try_from(&raw)
731    }
732
733    /// Get a `SlabsItems` object with information about the `SlabItem` items stored in
734    /// each slab class maintained by the Memcached server. The ID of each `SlabItem`
735    /// corresponds to a `Slab` maintained by the server. Note that `SlabItem` IDs may
736    /// not be contiguous based on the size of items actually stored by the server.
737    pub async fn items(&mut self) -> Result<SlabItems, MtopError> {
738        self.send(Command::StatsItems).await?;
739        let raw = self.read_stats_response().await?;
740        SlabItems::try_from(&raw)
741    }
742
743    async fn read_stats_response(&mut self) -> Result<HashMap<String, String>, MtopError> {
744        let mut out = HashMap::new();
745
746        while let Some(v) = self.read.next_line().await? {
747            if v == "END" {
748                break;
749            }
750
751            let (key, val) = Self::parse_stat_line(&v)?;
752            out.insert(key.to_owned(), val.to_owned());
753        }
754
755        Ok(out)
756    }
757
758    fn parse_stat_line(line: &str) -> Result<(&str, &str), MtopError> {
759        let mut parts = line.splitn(3, ' ');
760        match (parts.next(), parts.next(), parts.next()) {
761            (Some("STAT"), Some(key), Some(val)) => Ok((key, val)),
762            _ => {
763                if let Some(err) = Self::parse_error(line) {
764                    Err(MtopError::from(err))
765                } else {
766                    Err(MtopError::runtime(format!("unable to parse '{}'", line)))
767                }
768            }
769        }
770    }
771
772    /// Get a `Meta` object for every item in the cache which includes its key and expiration
773    /// time as a UNIX timestamp. Expiration time will be `-1` if the item was set with an
774    /// infinite TTL.
775    pub async fn metas(&mut self) -> Result<Vec<Meta>, MtopError> {
776        self.send(Command::CrawlerMetadump).await?;
777        let mut out = Vec::new();
778        let mut raw = HashMap::new();
779
780        while let Some(v) = self.read.next_line().await? {
781            if v == "END" {
782                break;
783            }
784
785            // Check for an error first because the `metadump` command doesn't
786            // have any sort of prefix for each result line like `STAT` or `VALUE`
787            // so it's hard to know if it's valid without looking for an error.
788            if let Some(err) = Self::parse_error(&v) {
789                return Err(MtopError::from(err));
790            }
791
792            let item = Self::parse_crawler_meta(&v, Meta::KEYS, &mut raw)?;
793            out.push(item);
794        }
795
796        Ok(out)
797    }
798
799    fn parse_crawler_meta(line: &str, keys: &[&str], raw: &mut HashMap<String, String>) -> Result<Meta, MtopError> {
800        // Avoid allocating a new HashMap to parse every meta entry just to throw it away
801        raw.clear();
802
803        for p in line.split(' ') {
804            let (key, val) = p
805                .split_once('=')
806                .ok_or_else(|| MtopError::runtime(format!("unexpected metadump format '{}'", line)))?;
807
808            // Avoid spending time decoding values or allocating for data we don't care about.
809            // Use a slice here since it's faster than a HashSet when the number of entries is
810            // small and the number of keys we're searching through is always small.
811            if !keys.contains(&key) {
812                continue;
813            }
814
815            let decoded = urlencoding::decode(val)
816                .map_err(|e| MtopError::runtime_cause(format!("unexpected metadump encoding '{}'", line), e))?;
817            raw.insert(key.to_owned(), decoded.into_owned());
818        }
819
820        Meta::try_from(raw.deref())
821    }
822
823    /// Send a simple command to verify our connection to the server is working.
824    pub async fn ping(&mut self) -> Result<(), MtopError> {
825        self.send(Command::Version).await?;
826        if let Some(v) = self.read.next_line().await? {
827            if let Some(e) = Self::parse_error(&v) {
828                return Err(MtopError::from(e));
829            }
830            if !v.starts_with("VERSION") {
831                return Err(MtopError::runtime(format!("unable to parse '{}'", v)));
832            }
833        }
834
835        Ok(())
836    }
837
838    /// Flush all entries in the cache, optionally after a delay. When a delay is used, the
839    /// server will flush entries after a delay but the call will still return immediately.
840    pub async fn flush_all(&mut self, wait: Option<Duration>) -> Result<(), MtopError> {
841        let cmd = if let Some(d) = wait {
842            Command::FlushAllWait(d.as_secs())
843        } else {
844            Command::FlushAll
845        };
846
847        self.send(cmd).await?;
848        self.read_simple_response("OK").await
849    }
850
851    /// Get a map of the requested keys and their corresponding `Value` in the cache
852    /// including the key, flags, and data.
853    pub async fn get(&mut self, keys: &[Key]) -> Result<HashMap<String, Value>, MtopError> {
854        self.send(Command::Gets(keys)).await?;
855        let mut out = HashMap::with_capacity(keys.len());
856
857        while let Some(v) = self.read.next_line().await? {
858            if v == "END" {
859                break;
860            }
861
862            let value = self.parse_gets_value(&v).await?;
863            out.insert(value.key.clone(), value);
864        }
865
866        Ok(out)
867    }
868
869    async fn parse_gets_value(&mut self, line: &str) -> Result<Value, MtopError> {
870        let mut parts = line.splitn(5, ' ');
871
872        match (parts.next(), parts.next(), parts.next(), parts.next(), parts.next()) {
873            (Some("VALUE"), Some(k), Some(flags), Some(len), Some(cas)) => {
874                let flags: u64 = parse_value(flags, line)?;
875                let len: u64 = parse_value(len, line)?;
876                let cas: u64 = parse_value(cas, line)?;
877
878                // The max size of an object in Memcached is represented with a `u64` which
879                // means it's basically infinite. In practice the default max size of an object
880                // in a Memcached server is 1MB but can be configured higher. Place a limit on
881                // the size that we'll accept here to avoid a denial of service from bad lengths.
882                if len > Self::MAX_PAYLOAD_SIZE {
883                    return Err(MtopError::runtime(format!(
884                        "server response of length {} exceeds client max of {}",
885                        len,
886                        Self::MAX_PAYLOAD_SIZE
887                    )));
888                }
889
890                // Two extra bytes to read the trailing \r\n but then truncate them.
891                let mut data = Vec::with_capacity(len as usize + 2);
892                let reader = self.read.get_mut();
893                reader.take(len + 2).read_to_end(&mut data).await?;
894                data.truncate(len as usize);
895
896                Ok(Value {
897                    key: k.to_owned(),
898                    flags,
899                    cas,
900                    data,
901                })
902            }
903            _ => {
904                // Response doesn't look like a `VALUE` line, see if the server has
905                // responded with an error that we can parse. Otherwise, consider this
906                // an internal error.
907                if let Some(err) = Self::parse_error(line) {
908                    Err(MtopError::from(err))
909                } else {
910                    Err(MtopError::runtime(format!("unable to parse '{}'", line)))
911                }
912            }
913        }
914    }
915
916    /// Increment the value of a key by the given delta if the value is numeric returning
917    /// the new value. Returns an error if the value is _not_ numeric.
918    pub async fn incr(&mut self, key: &Key, delta: u64) -> Result<u64, MtopError> {
919        self.send(Command::Incr(key, delta)).await?;
920        if let Some(v) = self.read.next_line().await? {
921            Self::parse_numeric_response(&v)
922        } else {
923            Err(MtopError::runtime("unexpected empty response"))
924        }
925    }
926
927    /// Decrement the value of a key by the given delta if the value is numeric returning
928    /// the new value with a minimum of 0. Returns an error if the value is _not_ numeric.
929    pub async fn decr(&mut self, key: &Key, delta: u64) -> Result<u64, MtopError> {
930        self.send(Command::Decr(key, delta)).await?;
931        if let Some(v) = self.read.next_line().await? {
932            Self::parse_numeric_response(&v)
933        } else {
934            Err(MtopError::runtime("unexpected empty response"))
935        }
936    }
937
938    fn parse_numeric_response(line: &str) -> Result<u64, MtopError> {
939        if let Some(err) = Self::parse_error(line) {
940            Err(MtopError::from(err))
941        } else {
942            line.parse()
943                .map_err(|_e| MtopError::runtime(format!("unable to parse '{}'", line)))
944        }
945    }
946
947    /// Store the provided item in the cache, regardless of whether it already exists.
948    pub async fn set<V>(&mut self, key: &Key, flags: u64, ttl: u32, data: V) -> Result<(), MtopError>
949    where
950        V: AsRef<[u8]>,
951    {
952        self.send(Command::Set(key, flags, ttl, data.as_ref())).await?;
953        self.read_simple_response("STORED").await
954    }
955
956    /// Store the provided item in the cache only if it does not already exist.
957    pub async fn add<V>(&mut self, key: &Key, flags: u64, ttl: u32, data: V) -> Result<(), MtopError>
958    where
959        V: AsRef<[u8]>,
960    {
961        self.send(Command::Add(key, flags, ttl, data.as_ref())).await?;
962        self.read_simple_response("STORED").await
963    }
964
965    /// Store the provided item in the cache only if it already exists.
966    pub async fn replace<V>(&mut self, key: &Key, flags: u64, ttl: u32, data: V) -> Result<(), MtopError>
967    where
968        V: AsRef<[u8]>,
969    {
970        self.send(Command::Replace(key, flags, ttl, data.as_ref())).await?;
971        self.read_simple_response("STORED").await
972    }
973
974    /// Update the TTL of an item in the cache if it exists, return an error otherwise.
975    pub async fn touch(&mut self, key: &Key, ttl: u32) -> Result<(), MtopError> {
976        self.send(Command::Touch(key, ttl)).await?;
977        self.read_simple_response("TOUCHED").await
978    }
979
980    /// Delete an item in the cache if it exists, return an error otherwise.
981    pub async fn delete(&mut self, key: &Key) -> Result<(), MtopError> {
982        self.send(Command::Delete(key)).await?;
983        self.read_simple_response("DELETED").await
984    }
985
986    async fn read_simple_response(&mut self, expected: &str) -> Result<(), MtopError> {
987        match self.read.next_line().await? {
988            Some(line) if line == expected => Ok(()),
989            Some(line) => {
990                if let Some(err) = Self::parse_error(&line) {
991                    Err(MtopError::from(err))
992                } else {
993                    Err(MtopError::runtime(format!("unable to parse '{}'", line)))
994                }
995            }
996            None => Err(MtopError::runtime("unexpected empty response")),
997        }
998    }
999
1000    fn parse_error(line: &str) -> Option<ProtocolError> {
1001        let mut values = line.splitn(2, ' ');
1002        let (kind, message) = match (values.next(), values.next()) {
1003            (Some("BADCLASS"), Some(msg)) => (ProtocolErrorKind::BadClass, Some(msg.to_owned())),
1004            (Some("BUSY"), Some(msg)) => (ProtocolErrorKind::Busy, Some(msg.to_owned())),
1005            (Some("CLIENT_ERROR"), Some(msg)) => (ProtocolErrorKind::Client, Some(msg.to_owned())),
1006            (Some("ERROR"), None) => (ProtocolErrorKind::Syntax, None),
1007            (Some("ERROR"), Some(msg)) => (ProtocolErrorKind::Syntax, Some(msg.to_owned())),
1008            (Some("NOT_FOUND"), None) => (ProtocolErrorKind::NotFound, None),
1009            (Some("NOT_STORED"), None) => (ProtocolErrorKind::NotStored, None),
1010            (Some("SERVER_ERROR"), Some(msg)) => (ProtocolErrorKind::Server, Some(msg.to_owned())),
1011
1012            _ => return None,
1013        };
1014
1015        Some(ProtocolError { kind, message })
1016    }
1017
1018    async fn send(&mut self, cmd: Command<'_>) -> Result<(), MtopError> {
1019        let cmd_bytes: Vec<u8> = cmd.into();
1020        self.write.write_all(&cmd_bytes).await?;
1021        Ok(self.write.flush().await?)
1022    }
1023}
1024
1025impl fmt::Debug for Memcached {
1026    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1027        f.debug_struct("Memcached")
1028            .field("read", &"...")
1029            .field("write", &"...")
1030            .finish()
1031    }
1032}
1033
1034#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1035#[repr(transparent)]
1036pub struct Key(String);
1037
1038impl Key {
1039    const MAX_LENGTH: usize = 250;
1040
1041    pub fn one<T>(val: T) -> Result<Key, MtopError>
1042    where
1043        T: Into<String>,
1044    {
1045        let val = val.into();
1046        if !Self::is_legal_val(&val) {
1047            Err(MtopError::runtime(format!("invalid key {}", val)))
1048        } else {
1049            Ok(Key(val))
1050        }
1051    }
1052
1053    pub fn many<I, T>(vals: I) -> Result<Vec<Key>, MtopError>
1054    where
1055        I: IntoIterator<Item = T>,
1056        T: Into<String>,
1057    {
1058        let iter = vals.into_iter();
1059        let (sz, _) = iter.size_hint();
1060        let mut out = Vec::with_capacity(sz);
1061
1062        for val in iter {
1063            out.push(Self::one(val)?);
1064        }
1065
1066        Ok(out)
1067    }
1068
1069    pub fn len(&self) -> usize {
1070        self.0.len()
1071    }
1072
1073    pub fn is_empty(&self) -> bool {
1074        self.0.is_empty()
1075    }
1076
1077    fn is_legal_val(val: &str) -> bool {
1078        if val.len() > Self::MAX_LENGTH {
1079            return false;
1080        }
1081
1082        for c in val.chars() {
1083            if !c.is_ascii() || c.is_ascii_whitespace() || c.is_ascii_control() {
1084                return false;
1085            }
1086        }
1087
1088        true
1089    }
1090}
1091
1092impl fmt::Display for Key {
1093    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1094        self.0.fmt(f)
1095    }
1096}
1097
1098impl AsRef<str> for Key {
1099    fn as_ref(&self) -> &str {
1100        &self.0
1101    }
1102}
1103
1104impl Borrow<str> for Key {
1105    fn borrow(&self) -> &str {
1106        &self.0
1107    }
1108}
1109
1110#[cfg(test)]
1111mod test {
1112    use super::{ErrorKind, Key, Memcached, Meta, Slab, SlabItem, SlabItems};
1113    use std::io::{Cursor, Error};
1114    use std::pin::Pin;
1115    use std::task::{Context, Poll};
1116    use std::time::Duration;
1117    use tokio::io::AsyncWrite;
1118    use tokio::sync::mpsc::{self, UnboundedSender};
1119
1120    /////////
1121    // key //
1122    /////////
1123
1124    #[test]
1125    fn test_key_one_length() {
1126        let val = "abc".repeat(Key::MAX_LENGTH);
1127        let res = Key::one(val);
1128        assert!(res.is_err());
1129    }
1130
1131    #[test]
1132    fn test_key_one_non_ascii() {
1133        let val = "🤦";
1134        let res = Key::one(val);
1135        assert!(res.is_err());
1136    }
1137
1138    #[test]
1139    fn test_key_one_whitespace() {
1140        let val = "some thing";
1141        let res = Key::one(val);
1142        assert!(res.is_err());
1143    }
1144
1145    #[test]
1146    fn test_key_one_control_char() {
1147        let val = "\x7F";
1148        let res = Key::one(val);
1149        assert!(res.is_err());
1150    }
1151
1152    #[test]
1153    fn test_key_one_success() {
1154        let val = "a-reasonable-key";
1155        let res = Key::one(val);
1156        assert!(res.is_ok());
1157    }
1158
1159    struct WriteAdapter {
1160        tx: UnboundedSender<Vec<u8>>,
1161    }
1162
1163    impl AsyncWrite for WriteAdapter {
1164        fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, Error>> {
1165            self.tx.send(buf.to_owned()).unwrap();
1166            Poll::Ready(Ok(buf.len()))
1167        }
1168
1169        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1170            Poll::Ready(Ok(()))
1171        }
1172
1173        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1174            Poll::Ready(Ok(()))
1175        }
1176    }
1177
1178    /// Create a new receiver channel and `Memcached` instance to read the provided server
1179    /// response. Anything written by the client is able to be read from the receiver channel.
1180    /// NOTE that it is important that the receiver not be dropped by the caller since this will
1181    /// cause writes to the channel to fail from within the client.
1182    macro_rules! client {
1183        () => ({
1184            let (tx, rx) = mpsc::unbounded_channel();
1185            let reads = Vec::new();
1186            (rx, Memcached::new(Cursor::new(reads), WriteAdapter { tx }))
1187        });
1188        ($($line:expr),+ $(,)?) => ({
1189            let (tx, rx) = mpsc::unbounded_channel();
1190            let mut reads = Vec::new();
1191            $(reads.extend_from_slice($line.as_bytes());)+
1192            (rx, Memcached::new(Cursor::new(reads), WriteAdapter { tx }))
1193        })
1194    }
1195
1196    /////////
1197    // get //
1198    /////////
1199
1200    #[tokio::test]
1201    async fn test_memcached_get_no_key() {
1202        let (_rx, mut client) = client!();
1203        let vals: Vec<String> = vec![];
1204        let keys = Key::many(vals).unwrap();
1205        let res = client.get(&keys).await.unwrap();
1206
1207        assert!(res.is_empty());
1208    }
1209
1210    #[tokio::test]
1211    async fn test_memcached_get_error() {
1212        let (_rx, mut client) = client!("SERVER_ERROR backend failure\r\n");
1213        let keys = Key::many(vec!["foo", "baz"]).unwrap();
1214        let res = client.get(&keys).await;
1215
1216        assert!(res.is_err());
1217        let err = res.unwrap_err();
1218        assert_eq!(ErrorKind::Protocol, err.kind());
1219    }
1220
1221    #[tokio::test]
1222    async fn test_memcached_get_miss() {
1223        let (_rx, mut client) = client!("END\r\n");
1224        let keys = Key::many(vec!["foo", "baz"]).unwrap();
1225        let res = client.get(&keys).await.unwrap();
1226
1227        assert!(res.is_empty());
1228    }
1229
1230    #[tokio::test]
1231    async fn test_memcached_get_hit() {
1232        let (_rx, mut client) = client!(
1233            "VALUE foo 32 3 1\r\n",
1234            "bar\r\n",
1235            "VALUE baz 64 3 2\r\n",
1236            "qux\r\n",
1237            "END\r\n",
1238        );
1239        let keys = Key::many(vec!["foo", "baz"]).unwrap();
1240        let res = client.get(&keys).await.unwrap();
1241
1242        let val1 = res.get("foo").unwrap();
1243        assert_eq!("foo", val1.key);
1244        assert_eq!("bar".as_bytes(), val1.data);
1245        assert_eq!(32, val1.flags);
1246        assert_eq!(1, val1.cas);
1247
1248        let val2 = res.get("baz").unwrap();
1249        assert_eq!("baz", val2.key);
1250        assert_eq!("qux".as_bytes(), val2.data);
1251        assert_eq!(64, val2.flags);
1252        assert_eq!(2, val2.cas);
1253    }
1254
1255    //////////
1256    // incr //
1257    //////////
1258
1259    #[tokio::test]
1260    async fn test_memcached_incr_bad_val() {
1261        let (mut rx, mut client) = client!("CLIENT_ERROR cannot increment or decrement non-numeric value\r\n");
1262        let key = Key::one("test").unwrap();
1263        let res = client.incr(&key, 2).await;
1264
1265        assert!(res.is_err());
1266        let err = res.unwrap_err();
1267        assert_eq!(ErrorKind::Protocol, err.kind());
1268
1269        let bytes = rx.recv().await.unwrap();
1270        let command = String::from_utf8(bytes).unwrap();
1271        assert_eq!("incr test 2\r\n", command);
1272    }
1273
1274    #[tokio::test]
1275    async fn test_memcached_incr_success() {
1276        let (mut rx, mut client) = client!("3\r\n");
1277        let key = Key::one("test").unwrap();
1278        let res = client.incr(&key, 2).await.unwrap();
1279
1280        assert_eq!(3, res);
1281        let bytes = rx.recv().await.unwrap();
1282        let command = String::from_utf8(bytes).unwrap();
1283        assert_eq!("incr test 2\r\n", command);
1284    }
1285
1286    //////////
1287    // decr //
1288    //////////
1289
1290    #[tokio::test]
1291    async fn test_memcached_decr_bad_val() {
1292        let (mut rx, mut client) = client!("CLIENT_ERROR cannot increment or decrement non-numeric value\r\n");
1293        let key = Key::one("test").unwrap();
1294        let res = client.decr(&key, 1).await;
1295
1296        assert!(res.is_err());
1297        let err = res.unwrap_err();
1298        assert_eq!(ErrorKind::Protocol, err.kind());
1299
1300        let bytes = rx.recv().await.unwrap();
1301        let command = String::from_utf8(bytes).unwrap();
1302        assert_eq!("decr test 1\r\n", command);
1303    }
1304
1305    #[tokio::test]
1306    async fn test_memcached_decr_success() {
1307        let (mut rx, mut client) = client!("3\r\n");
1308        let key = Key::one("test").unwrap();
1309        let res = client.decr(&key, 1).await.unwrap();
1310
1311        assert_eq!(3, res);
1312        let bytes = rx.recv().await.unwrap();
1313        let command = String::from_utf8(bytes).unwrap();
1314        assert_eq!("decr test 1\r\n", command);
1315    }
1316
1317    //////////
1318    // ping //
1319    //////////
1320
1321    #[tokio::test]
1322    async fn test_memcached_ping_bad_val() {
1323        let (mut rx, mut client) = client!("220 localhost ESMTP Postfix\r\n");
1324        let res = client.ping().await;
1325
1326        assert!(res.is_err());
1327        let err = res.unwrap_err();
1328        assert_eq!(ErrorKind::Runtime, err.kind());
1329
1330        let bytes = rx.recv().await.unwrap();
1331        let command = String::from_utf8(bytes).unwrap();
1332        assert_eq!("version\r\n", command);
1333    }
1334
1335    #[tokio::test]
1336    async fn test_memcached_ping_success() {
1337        let (mut rx, mut client) = client!("VERSION 1.6.22\r\n");
1338        client.ping().await.unwrap();
1339
1340        let bytes = rx.recv().await.unwrap();
1341        let command = String::from_utf8(bytes).unwrap();
1342        assert_eq!("version\r\n", command);
1343    }
1344
1345    macro_rules! test_store_command_success {
1346        ($method:ident, $verb:expr) => {
1347            let (mut rx, mut client) = client!("STORED\r\n");
1348            let res = client.$method(&Key::one("test").unwrap(), 0, 300, "val".as_bytes()).await;
1349
1350            assert!(res.is_ok());
1351            let bytes = rx.recv().await.unwrap();
1352            let command = String::from_utf8(bytes).unwrap();
1353            assert_eq!(concat!($verb, " test 0 300 3\r\nval\r\n"), command);
1354        };
1355    }
1356
1357    macro_rules! test_store_command_error {
1358        ($method:ident, $verb:expr) => {
1359            let (mut rx, mut client) = client!("NOT_STORED\r\n");
1360            let res = client.$method(&Key::one("test").unwrap(), 0, 300, "val".as_bytes()).await;
1361
1362            assert!(res.is_err());
1363            let err = res.unwrap_err();
1364            assert_eq!(ErrorKind::Protocol, err.kind());
1365
1366            let bytes = rx.recv().await.unwrap();
1367            let command = String::from_utf8(bytes).unwrap();
1368            assert_eq!(concat!($verb, " test 0 300 3\r\nval\r\n"), command);
1369        };
1370    }
1371
1372    /////////
1373    // set //
1374    /////////
1375
1376    #[tokio::test]
1377    async fn test_memcached_set_success() {
1378        test_store_command_success!(set, "set");
1379    }
1380
1381    #[tokio::test]
1382    async fn test_memcached_set_error() {
1383        test_store_command_error!(set, "set");
1384    }
1385
1386    /////////
1387    // add //
1388    /////////
1389
1390    #[tokio::test]
1391    async fn test_memcached_add_success() {
1392        test_store_command_success!(add, "add");
1393    }
1394
1395    #[tokio::test]
1396    async fn test_memcached_add_error() {
1397        test_store_command_error!(add, "add");
1398    }
1399
1400    /////////////
1401    // replace //
1402    /////////////
1403
1404    #[tokio::test]
1405    async fn test_memcached_replace_success() {
1406        test_store_command_success!(replace, "replace");
1407    }
1408
1409    #[tokio::test]
1410    async fn test_memcached_replace_error() {
1411        test_store_command_error!(replace, "replace");
1412    }
1413
1414    ///////////
1415    // touch //
1416    ///////////
1417
1418    #[tokio::test]
1419    async fn test_memcached_touch_success() {
1420        let (mut rx, mut client) = client!("TOUCHED\r\n");
1421        let key = Key::one("test").unwrap();
1422        let res = client.touch(&key, 300).await;
1423
1424        assert!(res.is_ok());
1425        let bytes = rx.recv().await.unwrap();
1426        let command = String::from_utf8(bytes).unwrap();
1427        assert_eq!("touch test 300\r\n", command);
1428    }
1429
1430    #[tokio::test]
1431    async fn test_memcached_touch_error() {
1432        let (mut rx, mut client) = client!("NOT_FOUND\r\n");
1433        let key = Key::one("test").unwrap();
1434        let res = client.touch(&key, 300).await;
1435
1436        assert!(res.is_err());
1437        let err = res.unwrap_err();
1438        assert_eq!(ErrorKind::Protocol, err.kind());
1439
1440        let bytes = rx.recv().await.unwrap();
1441        let command = String::from_utf8(bytes).unwrap();
1442        assert_eq!("touch test 300\r\n", command);
1443    }
1444
1445    ////////////
1446    // delete //
1447    ////////////
1448
1449    #[tokio::test]
1450    async fn test_memcached_delete_success() {
1451        let (mut rx, mut client) = client!("DELETED\r\n");
1452        let key = Key::one("test").unwrap();
1453        let res = client.delete(&key).await;
1454
1455        assert!(res.is_ok());
1456        let bytes = rx.recv().await.unwrap();
1457        let command = String::from_utf8(bytes).unwrap();
1458        assert_eq!("delete test\r\n", command);
1459    }
1460
1461    #[tokio::test]
1462    async fn test_memcached_delete_error() {
1463        let (mut rx, mut client) = client!("NOT_FOUND\r\n");
1464        let key = Key::one("test").unwrap();
1465        let res = client.delete(&key).await;
1466
1467        assert!(res.is_err());
1468        let err = res.unwrap_err();
1469        assert_eq!(ErrorKind::Protocol, err.kind());
1470
1471        let bytes = rx.recv().await.unwrap();
1472        let command = String::from_utf8(bytes).unwrap();
1473        assert_eq!("delete test\r\n", command);
1474    }
1475
1476    ///////////////
1477    // flush_all //
1478    ///////////////
1479
1480    #[tokio::test]
1481    async fn test_memcached_flush_all_with_wait_success() {
1482        let (mut rx, mut client) = client!("OK\r\n");
1483        let wait = Some(Duration::from_secs(25));
1484        let res = client.flush_all(wait).await;
1485
1486        assert!(res.is_ok());
1487        let bytes = rx.recv().await.unwrap();
1488        let command = String::from_utf8(bytes).unwrap();
1489        assert_eq!("flush_all 25\r\n", command);
1490    }
1491
1492    #[tokio::test]
1493    async fn test_memcached_flush_all_with_wait_error() {
1494        let (mut rx, mut client) = client!("ERROR\r\n");
1495        let wait = Some(Duration::from_secs(25));
1496        let res = client.flush_all(wait).await;
1497
1498        assert!(res.is_err());
1499        let err = res.unwrap_err();
1500        assert_eq!(ErrorKind::Protocol, err.kind());
1501
1502        let bytes = rx.recv().await.unwrap();
1503        let command = String::from_utf8(bytes).unwrap();
1504        assert_eq!("flush_all 25\r\n", command);
1505    }
1506
1507    #[tokio::test]
1508    async fn test_memcached_flush_all_no_wait_success() {
1509        let (mut rx, mut client) = client!("OK\r\n");
1510        let res = client.flush_all(None).await;
1511
1512        assert!(res.is_ok());
1513        let bytes = rx.recv().await.unwrap();
1514        let command = String::from_utf8(bytes).unwrap();
1515        assert_eq!("flush_all\r\n", command);
1516    }
1517
1518    #[tokio::test]
1519    async fn test_memcached_flush_all_no_wait_error() {
1520        let (mut rx, mut client) = client!("ERROR\r\n");
1521        let res = client.flush_all(None).await;
1522
1523        assert!(res.is_err());
1524        let err = res.unwrap_err();
1525        assert_eq!(ErrorKind::Protocol, err.kind());
1526
1527        let bytes = rx.recv().await.unwrap();
1528        let command = String::from_utf8(bytes).unwrap();
1529        assert_eq!("flush_all\r\n", command);
1530    }
1531
1532    ///////////
1533    // stats //
1534    ///////////
1535
1536    #[tokio::test]
1537    async fn test_memcached_stats_empty() {
1538        let (_rx, mut client) = client!("END\r\n");
1539        let res = client.stats().await;
1540
1541        assert!(res.is_err());
1542        let err = res.unwrap_err();
1543        assert_eq!(ErrorKind::Runtime, err.kind());
1544    }
1545
1546    #[tokio::test]
1547    async fn test_memcached_stats_error() {
1548        let (_rx, mut client) = client!("SERVER_ERROR backend failure\r\n");
1549        let res = client.stats().await;
1550
1551        assert!(res.is_err());
1552        let err = res.unwrap_err();
1553        assert_eq!(ErrorKind::Protocol, err.kind());
1554    }
1555
1556    #[tokio::test]
1557    async fn test_memcached_stats_success() {
1558        let (_rx, mut client) = client!(
1559            "STAT pid 1525\r\n",
1560            "STAT uptime 271984\r\n",
1561            "STAT time 1687212809\r\n",
1562            "STAT version 1.6.14\r\n",
1563            "STAT libevent 2.1.12-stable\r\n",
1564            "STAT pointer_size 64\r\n",
1565            "STAT rusage_user 17.544323\r\n",
1566            "STAT rusage_system 11.830461\r\n",
1567            "STAT max_connections 1024\r\n",
1568            "STAT curr_connections 1\r\n",
1569            "STAT total_connections 3\r\n",
1570            "STAT rejected_connections 0\r\n",
1571            "STAT connection_structures 2\r\n",
1572            "STAT response_obj_oom 0\r\n",
1573            "STAT response_obj_count 1\r\n",
1574            "STAT response_obj_bytes 32768\r\n",
1575            "STAT read_buf_count 4\r\n",
1576            "STAT read_buf_bytes 65536\r\n",
1577            "STAT read_buf_bytes_free 16384\r\n",
1578            "STAT read_buf_oom 0\r\n",
1579            "STAT reserved_fds 20\r\n",
1580            "STAT cmd_get 1\r\n",
1581            "STAT cmd_set 0\r\n",
1582            "STAT cmd_flush 0\r\n",
1583            "STAT cmd_touch 0\r\n",
1584            "STAT cmd_meta 0\r\n",
1585            "STAT get_hits 0\r\n",
1586            "STAT get_misses 1\r\n",
1587            "STAT get_expired 0\r\n",
1588            "STAT get_flushed 0\r\n",
1589            "STAT delete_misses 0\r\n",
1590            "STAT delete_hits 0\r\n",
1591            "STAT incr_misses 0\r\n",
1592            "STAT incr_hits 0\r\n",
1593            "STAT decr_misses 0\r\n",
1594            "STAT decr_hits 0\r\n",
1595            "STAT cas_misses 0\r\n",
1596            "STAT cas_hits 0\r\n",
1597            "STAT cas_badval 0\r\n",
1598            "STAT touch_hits 0\r\n",
1599            "STAT touch_misses 0\r\n",
1600            "STAT store_too_large 0\r\n",
1601            "STAT store_no_memory 0\r\n",
1602            "STAT auth_cmds 0\r\n",
1603            "STAT auth_errors 0\r\n",
1604            "STAT bytes_read 16\r\n",
1605            "STAT bytes_written 7\r\n",
1606            "STAT limit_maxbytes 67108864\r\n",
1607            "STAT accepting_conns 1\r\n",
1608            "STAT listen_disabled_num 0\r\n",
1609            "STAT time_in_listen_disabled_us 0\r\n",
1610            "STAT threads 4\r\n",
1611            "STAT conn_yields 0\r\n",
1612            "STAT hash_power_level 16\r\n",
1613            "STAT hash_bytes 524288\r\n",
1614            "STAT hash_is_expanding 0\r\n",
1615            "STAT slab_reassign_rescues 0\r\n",
1616            "STAT slab_reassign_chunk_rescues 0\r\n",
1617            "STAT slab_reassign_evictions_nomem 0\r\n",
1618            "STAT slab_reassign_inline_reclaim 0\r\n",
1619            "STAT slab_reassign_busy_items 0\r\n",
1620            "STAT slab_reassign_busy_deletes 0\r\n",
1621            "STAT slab_reassign_running 0\r\n",
1622            "STAT slabs_moved 0\r\n",
1623            "STAT lru_crawler_running 0\r\n",
1624            "STAT lru_crawler_starts 105\r\n",
1625            "STAT lru_maintainer_juggles 271976\r\n",
1626            "STAT malloc_fails 0\r\n",
1627            "STAT log_worker_dropped 0\r\n",
1628            "STAT log_worker_written 0\r\n",
1629            "STAT log_watcher_skipped 0\r\n",
1630            "STAT log_watcher_sent 0\r\n",
1631            "STAT log_watchers 0\r\n",
1632            "STAT unexpected_napi_ids 0\r\n",
1633            "STAT round_robin_fallback 0\r\n",
1634            "STAT bytes 0\r\n",
1635            "STAT curr_items 0\r\n",
1636            "STAT total_items 0\r\n",
1637            "STAT slab_global_page_pool 0\r\n",
1638            "STAT expired_unfetched 0\r\n",
1639            "STAT evicted_unfetched 0\r\n",
1640            "STAT evicted_active 0\r\n",
1641            "STAT evictions 0\r\n",
1642            "STAT reclaimed 0\r\n",
1643            "STAT crawler_reclaimed 0\r\n",
1644            "STAT crawler_items_checked 0\r\n",
1645            "STAT lrutail_reflocked 0\r\n",
1646            "STAT moves_to_cold 0\r\n",
1647            "STAT moves_to_warm 0\r\n",
1648            "STAT moves_within_lru 0\r\n",
1649            "STAT direct_reclaims 0\r\n",
1650            "STAT lru_bumps_dropped 0\r\n",
1651            "END\r\n",
1652        );
1653        let res = client.stats().await.unwrap();
1654
1655        assert_eq!(0, res.cmd_set);
1656        assert_eq!(1, res.cmd_get);
1657        assert_eq!(1, res.get_misses);
1658        assert_eq!(0, res.get_hits);
1659    }
1660
1661    ///////////
1662    // slabs //
1663    ///////////
1664
1665    #[tokio::test]
1666    async fn test_memcached_slabs_empty() {
1667        let (_rx, mut client) = client!("STAT active_slabs 0\r\n", "STAT total_malloced 0\r\n", "END\r\n");
1668        let res = client.slabs().await.unwrap();
1669
1670        assert!(res.slabs.is_empty());
1671    }
1672
1673    #[tokio::test]
1674    async fn test_memcached_slabs_error() {
1675        let (_rx, mut client) = client!("ERROR Too many open connections\r\n");
1676        let res = client.slabs().await;
1677
1678        assert!(res.is_err());
1679        let err = res.unwrap_err();
1680        assert_eq!(ErrorKind::Protocol, err.kind());
1681    }
1682
1683    #[tokio::test]
1684    async fn test_memcached_slabs_success() {
1685        let (_rx, mut client) = client!(
1686            "STAT 6:chunk_size 304\r\n",
1687            "STAT 6:chunks_per_page 3449\r\n",
1688            "STAT 6:total_pages 1\r\n",
1689            "STAT 6:total_chunks 3449\r\n",
1690            "STAT 6:used_chunks 1\r\n",
1691            "STAT 6:free_chunks 3448\r\n",
1692            "STAT 6:free_chunks_end 0\r\n",
1693            "STAT 6:get_hits 951\r\n",
1694            "STAT 6:cmd_set 100\r\n",
1695            "STAT 6:delete_hits 0\r\n",
1696            "STAT 6:incr_hits 0\r\n",
1697            "STAT 6:decr_hits 0\r\n",
1698            "STAT 6:cas_hits 0\r\n",
1699            "STAT 6:cas_badval 0\r\n",
1700            "STAT 6:touch_hits 0\r\n",
1701            "STAT 7:chunk_size 384\r\n",
1702            "STAT 7:chunks_per_page 2730\r\n",
1703            "STAT 7:total_pages 1\r\n",
1704            "STAT 7:total_chunks 2730\r\n",
1705            "STAT 7:used_chunks 5\r\n",
1706            "STAT 7:free_chunks 2725\r\n",
1707            "STAT 7:free_chunks_end 0\r\n",
1708            "STAT 7:get_hits 4792\r\n",
1709            "STAT 7:cmd_set 520\r\n",
1710            "STAT 7:delete_hits 0\r\n",
1711            "STAT 7:incr_hits 0\r\n",
1712            "STAT 7:decr_hits 0\r\n",
1713            "STAT 7:cas_hits 0\r\n",
1714            "STAT 7:cas_badval 0\r\n",
1715            "STAT 7:touch_hits 0\r\n",
1716            "STAT active_slabs 2\r\n",
1717            "STAT total_malloced 30408704\r\n",
1718            "END\r\n",
1719        );
1720        let res = client.slabs().await.unwrap();
1721
1722        let expected = vec![
1723            Slab {
1724                id: 6,
1725                chunk_size: 304,
1726                chunks_per_page: 3449,
1727                total_pages: 1,
1728                total_chunks: 3449,
1729                used_chunks: 1,
1730                free_chunks: 3448,
1731                get_hits: 951,
1732                cmd_set: 100,
1733                delete_hits: 0,
1734                incr_hits: 0,
1735                decr_hits: 0,
1736                cas_hits: 0,
1737                cas_badval: 0,
1738                touch_hits: 0,
1739            },
1740            Slab {
1741                id: 7,
1742                chunk_size: 384,
1743                chunks_per_page: 2730,
1744                total_pages: 1,
1745                total_chunks: 2730,
1746                used_chunks: 5,
1747                free_chunks: 2725,
1748                get_hits: 4792,
1749                cmd_set: 520,
1750                delete_hits: 0,
1751                incr_hits: 0,
1752                decr_hits: 0,
1753                cas_hits: 0,
1754                cas_badval: 0,
1755                touch_hits: 0,
1756            },
1757        ];
1758
1759        assert_eq!(expected, res.slabs);
1760    }
1761
1762    ///////////
1763    // items //
1764    ///////////
1765
1766    #[tokio::test]
1767    async fn test_memcached_items_empty() {
1768        let (_rx, mut client) = client!();
1769        let res = client.items().await.unwrap();
1770
1771        assert!(res.is_empty());
1772    }
1773
1774    #[tokio::test]
1775    async fn test_memcached_items_error() {
1776        let (_rx, mut client) = client!("ERROR Too many open connections\r\n");
1777        let res = client.items().await;
1778
1779        assert!(res.is_err());
1780        let err = res.unwrap_err();
1781        assert_eq!(ErrorKind::Protocol, err.kind());
1782    }
1783
1784    #[tokio::test]
1785    async fn test_memcached_items_success() {
1786        let (_rx, mut client) = client!(
1787            "STAT items:39:number 3\r\n",
1788            "STAT items:39:number_hot 0\r\n",
1789            "STAT items:39:number_warm 1\r\n",
1790            "STAT items:39:number_cold 2\r\n",
1791            "STAT items:39:age_hot 0\r\n",
1792            "STAT items:39:age_warm 7\r\n",
1793            "STAT items:39:age 8\r\n",
1794            "STAT items:39:mem_requested 1535788\r\n",
1795            "STAT items:39:evicted 1646\r\n",
1796            "STAT items:39:evicted_nonzero 1646\r\n",
1797            "STAT items:39:evicted_time 0\r\n",
1798            "STAT items:39:outofmemory 9\r\n",
1799            "STAT items:39:tailrepairs 0\r\n",
1800            "STAT items:39:reclaimed 13\r\n",
1801            "STAT items:39:expired_unfetched 4\r\n",
1802            "STAT items:39:evicted_unfetched 202\r\n",
1803            "STAT items:39:evicted_active 6\r\n",
1804            "STAT items:39:crawler_reclaimed 0\r\n",
1805            "STAT items:39:crawler_items_checked 40\r\n",
1806            "STAT items:39:lrutail_reflocked 17365\r\n",
1807            "STAT items:39:moves_to_cold 8703\r\n",
1808            "STAT items:39:moves_to_warm 7285\r\n",
1809            "STAT items:39:moves_within_lru 3651\r\n",
1810            "STAT items:39:direct_reclaims 1949\r\n",
1811            "STAT items:39:hits_to_hot 894\r\n",
1812            "STAT items:39:hits_to_warm 4079\r\n",
1813            "STAT items:39:hits_to_cold 8043\r\n",
1814            "STAT items:39:hits_to_temp 0\r\n",
1815            "END\r\n",
1816        );
1817        let res = client.items().await.unwrap();
1818
1819        let expected = SlabItems {
1820            items: vec![SlabItem {
1821                id: 39,
1822                number: 3,
1823                number_hot: 0,
1824                number_warm: 1,
1825                number_cold: 2,
1826                age_hot: 0,
1827                age_warm: 7,
1828                age: 8,
1829                mem_requested: 1535788,
1830                evicted: 1646,
1831                evicted_nonzero: 1646,
1832                evicted_time: 0,
1833                out_of_memory: 9,
1834                tail_repairs: 0,
1835                reclaimed: 13,
1836                expired_unfetched: 4,
1837                evicted_unfetched: 202,
1838                evicted_active: 6,
1839                crawler_reclaimed: 0,
1840                crawler_items_checked: 40,
1841                lrutail_reflocked: 17365,
1842                moves_to_cold: 8703,
1843                moves_to_warm: 7285,
1844                moves_within_lru: 3651,
1845                direct_reclaims: 1949,
1846                hits_to_hot: 894,
1847                hits_to_warm: 4079,
1848                hits_to_cold: 8043,
1849                hits_to_temp: 0,
1850            }],
1851        };
1852
1853        assert_eq!(expected, res);
1854    }
1855
1856    //////////
1857    // meta //
1858    //////////
1859
1860    #[tokio::test]
1861    async fn test_memcached_metas_empty() {
1862        let (_rx, mut client) = client!();
1863        let res = client.metas().await.unwrap();
1864
1865        assert!(res.is_empty());
1866    }
1867
1868    #[tokio::test]
1869    async fn test_memcached_metas_error() {
1870        let (_rx, mut client) = client!("BUSY crawler is busy\r\n",);
1871        let res = client.metas().await;
1872
1873        assert!(res.is_err());
1874        let err = res.unwrap_err();
1875        assert_eq!(ErrorKind::Protocol, err.kind());
1876    }
1877
1878    #[tokio::test]
1879    async fn test_memcached_metas_success() {
1880        let (_rx, mut client) = client!(
1881            "key=memcached%2Fmurmur3_hash.c exp=1687216956 la=1687216656 cas=259502 fetch=yes cls=17 size=2912\r\n",
1882            "key=memcached%2Fmd5.h exp=1687216956 la=1687216656 cas=259731 fetch=yes cls=17 size=3593\r\n",
1883            "END\r\n",
1884        );
1885        let res = client.metas().await.unwrap();
1886
1887        let expected = vec![
1888            Meta {
1889                key: "memcached/murmur3_hash.c".to_string(),
1890                expires: 1687216956,
1891                size: 2912,
1892            },
1893            Meta {
1894                key: "memcached/md5.h".to_string(),
1895                expires: 1687216956,
1896                size: 3593,
1897            },
1898        ];
1899
1900        assert_eq!(expected, res);
1901    }
1902}