mtop_client/
core.rs

1use std::borrow::Borrow;
2use std::cmp::Ordering;
3use std::collections::{BTreeSet, HashMap};
4use std::error;
5use std::fmt;
6use std::io;
7use std::ops::Deref;
8use std::str::FromStr;
9use std::time::Duration;
10use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter, Lines};
11
12#[derive(Debug, Default, PartialEq, Clone)]
13pub struct Stats {
14    // Server info
15    pub pid: i64,
16    pub uptime: u64,
17    pub server_time: i64,
18    pub threads: u64,
19    pub version: String,
20
21    // CPU
22    pub rusage_user: f64,
23    pub rusage_system: f64,
24
25    // Connections
26    pub max_connections: u64,
27    pub curr_connections: u64,
28    pub total_connections: u64,
29    pub rejected_connections: u64,
30
31    // Commands
32    pub cmd_get: u64,
33    pub cmd_set: u64,
34    pub cmd_flush: u64,
35    pub cmd_touch: u64,
36    pub cmd_meta: u64,
37
38    // Gets
39    pub get_hits: u64,
40    pub get_misses: u64,
41    pub get_expired: u64,
42    pub get_flushed: u64,
43
44    // Sets
45    pub store_too_large: u64,
46    pub store_no_memory: u64,
47
48    // Deletes
49    pub delete_hits: u64,
50    pub delete_misses: u64,
51
52    // Incr/Decr
53    pub incr_hits: u64,
54    pub incr_misses: u64,
55    pub decr_hits: u64,
56    pub decr_misses: u64,
57
58    // Touches
59    pub touch_hits: u64,
60    pub touch_misses: u64,
61
62    // Bytes
63    pub bytes_read: u64,
64    pub bytes_written: u64,
65    pub bytes: u64,
66    pub max_bytes: u64,
67
68    // Items
69    pub curr_items: u64,
70    pub total_items: u64,
71    pub evictions: u64,
72}
73
74impl TryFrom<&HashMap<String, String>> for Stats {
75    type Error = MtopError;
76
77    fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
78        Ok(Stats {
79            pid: parse_field("pid", value)?,
80            uptime: parse_field("uptime", value)?,
81            server_time: parse_field("time", value)?,
82            version: parse_field("version", value)?,
83            threads: parse_field("threads", value)?,
84
85            rusage_user: parse_field("rusage_user", value)?,
86            rusage_system: parse_field("rusage_system", value)?,
87
88            max_connections: parse_field("max_connections", value)?,
89            curr_connections: parse_field("curr_connections", value)?,
90            total_connections: parse_field("total_connections", value)?,
91            rejected_connections: parse_field("rejected_connections", value)?,
92
93            cmd_get: parse_field("cmd_get", value)?,
94            cmd_set: parse_field("cmd_set", value)?,
95            cmd_flush: parse_field("cmd_flush", value)?,
96            cmd_touch: parse_field("cmd_touch", value)?,
97            cmd_meta: parse_field("cmd_meta", value)?,
98
99            get_hits: parse_field("get_hits", value)?,
100            get_misses: parse_field("get_misses", value)?,
101            get_expired: parse_field("get_expired", value)?,
102            get_flushed: parse_field("get_flushed", value)?,
103
104            store_too_large: parse_field("store_too_large", value)?,
105            store_no_memory: parse_field("store_no_memory", value)?,
106
107            delete_hits: parse_field("delete_hits", value)?,
108            delete_misses: parse_field("delete_misses", value)?,
109
110            incr_hits: parse_field("incr_hits", value)?,
111            incr_misses: parse_field("incr_misses", value)?,
112
113            decr_hits: parse_field("decr_hits", value)?,
114            decr_misses: parse_field("decr_misses", value)?,
115
116            touch_hits: parse_field("touch_hits", value)?,
117            touch_misses: parse_field("touch_misses", value)?,
118
119            bytes_read: parse_field("bytes_read", value)?,
120            bytes_written: parse_field("bytes_written", value)?,
121            bytes: parse_field("bytes", value)?,
122            max_bytes: parse_field("limit_maxbytes", value)?,
123
124            curr_items: parse_field("curr_items", value)?,
125            total_items: parse_field("total_items", value)?,
126            evictions: parse_field("evictions", value)?,
127        })
128    }
129}
130
131#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
132pub struct Slab {
133    pub id: u64,
134    pub chunk_size: u64,
135    pub chunks_per_page: u64,
136    pub total_pages: u64,
137    pub total_chunks: u64,
138    pub used_chunks: u64,
139    pub free_chunks: u64,
140    pub get_hits: u64,
141    pub cmd_set: u64,
142    pub delete_hits: u64,
143    pub incr_hits: u64,
144    pub decr_hits: u64,
145    pub cas_hits: u64,
146    pub cas_badval: u64,
147    pub touch_hits: u64,
148}
149
150impl PartialOrd for Slab {
151    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
152        Some(self.cmp(other))
153    }
154}
155
156impl Ord for Slab {
157    fn cmp(&self, other: &Self) -> Ordering {
158        self.id.cmp(&other.id)
159    }
160}
161
162#[derive(Debug, Default, PartialEq, Eq, Clone)]
163#[repr(transparent)]
164pub struct Slabs {
165    slabs: Vec<Slab>,
166}
167
168impl Slabs {
169    pub fn iter(&self) -> impl ExactSizeIterator<Item = &Slab> {
170        self.slabs.iter()
171    }
172
173    pub fn len(&self) -> usize {
174        self.slabs.len()
175    }
176
177    pub fn is_empty(&self) -> bool {
178        self.slabs.is_empty()
179    }
180
181    pub fn find_for_size(&self, size: u64) -> Option<&Slab> {
182        // Find the slab with an appropriate chunk size for an item with the given
183        // size. If there is no slab with a chunk size that fits the item, return the
184        // last (hence largest) slab class since this is what Memcached does internally.
185        self.slabs
186            .get(self.slabs.partition_point(|s| s.chunk_size < size))
187            .or_else(|| self.slabs.last())
188    }
189}
190
191impl IntoIterator for Slabs {
192    type Item = Slab;
193    type IntoIter = std::vec::IntoIter<Slab>;
194
195    fn into_iter(self) -> Self::IntoIter {
196        self.slabs.into_iter()
197    }
198}
199
200impl TryFrom<&HashMap<String, String>> for Slabs {
201    type Error = MtopError;
202
203    fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
204        // Parse the slab IDs from each of the raw stats. We have to do this because
205        // Memcached isn't guaranteed to use a particular slab ID if there are no items
206        // to store in that size class. Otherwise, we could just loop from one to
207        // $active_slabs + 1.
208        let mut ids = BTreeSet::new();
209        for k in value.keys() {
210            let key_id: Option<u64> = k.split_once(':').map(|(raw, _rest)| raw).and_then(|raw| raw.parse().ok());
211
212            if let Some(id) = key_id {
213                ids.insert(id);
214            }
215        }
216
217        let mut slabs = Vec::with_capacity(ids.len());
218
219        for id in ids {
220            slabs.push(Slab {
221                id,
222                chunk_size: parse_field(&format!("{}:chunk_size", id), value)?,
223                chunks_per_page: parse_field(&format!("{}:chunks_per_page", id), value)?,
224                total_pages: parse_field(&format!("{}:total_pages", id), value)?,
225                total_chunks: parse_field(&format!("{}:total_chunks", id), value)?,
226                used_chunks: parse_field(&format!("{}:used_chunks", id), value)?,
227                free_chunks: parse_field(&format!("{}:free_chunks", id), value)?,
228                get_hits: parse_field(&format!("{}:get_hits", id), value)?,
229                cmd_set: parse_field(&format!("{}:cmd_set", id), value)?,
230                delete_hits: parse_field(&format!("{}:delete_hits", id), value)?,
231                incr_hits: parse_field(&format!("{}:incr_hits", id), value)?,
232                decr_hits: parse_field(&format!("{}:decr_hits", id), value)?,
233                cas_hits: parse_field(&format!("{}:cas_hits", id), value)?,
234                cas_badval: parse_field(&format!("{}:cas_badval", id), value)?,
235                touch_hits: parse_field(&format!("{}:touch_hits", id), value)?,
236            })
237        }
238
239        Ok(Self { slabs })
240    }
241}
242
243#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
244pub struct SlabItem {
245    pub id: u64,
246    pub number: u64,
247    pub number_hot: u64,
248    pub number_warm: u64,
249    pub number_cold: u64,
250    pub age_hot: u64,
251    pub age_warm: u64,
252    pub age: u64,
253    pub mem_requested: u64,
254    pub evicted: u64,
255    pub evicted_nonzero: u64,
256    pub evicted_time: u64,
257    pub out_of_memory: u64,
258    pub tail_repairs: u64,
259    pub reclaimed: u64,
260    pub expired_unfetched: u64,
261    pub evicted_unfetched: u64,
262    pub evicted_active: u64,
263    pub crawler_reclaimed: u64,
264    pub crawler_items_checked: u64,
265    pub lrutail_reflocked: u64,
266    pub moves_to_cold: u64,
267    pub moves_to_warm: u64,
268    pub moves_within_lru: u64,
269    pub direct_reclaims: u64,
270    pub hits_to_hot: u64,
271    pub hits_to_warm: u64,
272    pub hits_to_cold: u64,
273    pub hits_to_temp: u64,
274}
275
276impl PartialOrd for SlabItem {
277    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
278        Some(self.cmp(other))
279    }
280}
281
282impl Ord for SlabItem {
283    fn cmp(&self, other: &Self) -> Ordering {
284        self.id.cmp(&other.id)
285    }
286}
287
288#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
289#[repr(transparent)]
290pub struct SlabItems {
291    items: Vec<SlabItem>,
292}
293
294impl SlabItems {
295    pub fn iter(&self) -> impl ExactSizeIterator<Item = &SlabItem> {
296        self.items.iter()
297    }
298
299    pub fn len(&self) -> usize {
300        self.items.len()
301    }
302
303    pub fn is_empty(&self) -> bool {
304        self.items.is_empty()
305    }
306}
307
308impl IntoIterator for SlabItems {
309    type Item = SlabItem;
310    type IntoIter = std::vec::IntoIter<SlabItem>;
311
312    fn into_iter(self) -> Self::IntoIter {
313        self.items.into_iter()
314    }
315}
316
317impl TryFrom<&HashMap<String, String>> for SlabItems {
318    type Error = MtopError;
319
320    fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
321        // Parse the slab IDs from each of the raw stats. We have to do this because
322        // Memcached isn't guaranteed to use a particular slab ID if there are no items
323        // to store in that size class. Otherwise, we could just loop from one to
324        // $active_slabs + 1.
325        let mut ids = BTreeSet::new();
326        for k in value.keys() {
327            let key_id: Option<u64> = k
328                .trim_start_matches("items:")
329                .split_once(':')
330                .map(|(raw, _rest)| raw)
331                .and_then(|raw| raw.parse().ok());
332
333            if let Some(id) = key_id {
334                ids.insert(id);
335            }
336        }
337
338        let mut items = Vec::with_capacity(ids.len());
339
340        for id in ids {
341            items.push(SlabItem {
342                id,
343                number: parse_field(&format!("items:{}:number", id), value)?,
344                number_hot: parse_field(&format!("items:{}:number_hot", id), value)?,
345                number_warm: parse_field(&format!("items:{}:number_warm", id), value)?,
346                number_cold: parse_field(&format!("items:{}:number_cold", id), value)?,
347                age_hot: parse_field(&format!("items:{}:age_hot", id), value)?,
348                age_warm: parse_field(&format!("items:{}:age_warm", id), value)?,
349                age: parse_field(&format!("items:{}:age", id), value)?,
350                mem_requested: parse_field(&format!("items:{}:mem_requested", id), value)?,
351                evicted: parse_field(&format!("items:{}:evicted", id), value)?,
352                evicted_nonzero: parse_field(&format!("items:{}:evicted_nonzero", id), value)?,
353                evicted_time: parse_field(&format!("items:{}:evicted_time", id), value)?,
354                out_of_memory: parse_field(&format!("items:{}:outofmemory", id), value)?,
355                tail_repairs: parse_field(&format!("items:{}:tailrepairs", id), value)?,
356                reclaimed: parse_field(&format!("items:{}:reclaimed", id), value)?,
357                expired_unfetched: parse_field(&format!("items:{}:expired_unfetched", id), value)?,
358                evicted_unfetched: parse_field(&format!("items:{}:evicted_unfetched", id), value)?,
359                evicted_active: parse_field(&format!("items:{}:evicted_active", id), value)?,
360                crawler_reclaimed: parse_field(&format!("items:{}:crawler_reclaimed", id), value)?,
361                crawler_items_checked: parse_field(&format!("items:{}:crawler_items_checked", id), value)?,
362                lrutail_reflocked: parse_field(&format!("items:{}:lrutail_reflocked", id), value)?,
363                moves_to_cold: parse_field(&format!("items:{}:moves_to_cold", id), value)?,
364                moves_to_warm: parse_field(&format!("items:{}:moves_to_warm", id), value)?,
365                moves_within_lru: parse_field(&format!("items:{}:moves_within_lru", id), value)?,
366                direct_reclaims: parse_field(&format!("items:{}:direct_reclaims", id), value)?,
367                hits_to_hot: parse_field(&format!("items:{}:hits_to_hot", id), value)?,
368                hits_to_warm: parse_field(&format!("items:{}:hits_to_warm", id), value)?,
369                hits_to_cold: parse_field(&format!("items:{}:hits_to_cold", id), value)?,
370                hits_to_temp: parse_field(&format!("items:{}:hits_to_temp", id), value)?,
371            })
372        }
373
374        Ok(Self { items })
375    }
376}
377
378#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
379pub struct Meta {
380    pub key: String,
381    pub expires: i64,
382    pub size: u64,
383}
384
385impl Meta {
386    // Meta information is returned from the server as multiple key-value pairs per
387    // line. We only care about a subset of those keys. Define them here to avoid doing
388    // extra work when parsing the server response.
389    const KEYS: &'static [&'static str] = &["key", "exp", "size"];
390}
391
392impl TryFrom<&HashMap<String, String>> for Meta {
393    type Error = MtopError;
394
395    fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
396        Ok(Meta {
397            key: parse_field("key", value)?,
398            expires: parse_field("exp", value)?,
399            size: parse_field("size", value)?,
400        })
401    }
402}
403
404impl PartialOrd for Meta {
405    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
406        Some(self.cmp(other))
407    }
408}
409
410impl Ord for Meta {
411    fn cmp(&self, other: &Self) -> Ordering {
412        self.key.cmp(&other.key)
413    }
414}
415
416#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
417pub struct Value {
418    pub key: String,
419    pub cas: u64,
420    pub flags: u64,
421    pub data: Vec<u8>,
422}
423
424impl PartialOrd for Value {
425    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
426        Some(self.cmp(other))
427    }
428}
429
430impl Ord for Value {
431    fn cmp(&self, other: &Self) -> Ordering {
432        self.key.cmp(&other.key)
433    }
434}
435
436fn parse_field<T>(key: &str, map: &HashMap<String, String>) -> Result<T, MtopError>
437where
438    T: FromStr,
439    <T as FromStr>::Err: fmt::Display + Send + Sync + error::Error + 'static,
440{
441    map.get(key)
442        .ok_or_else(|| MtopError::runtime(format!("field {} missing", key)))
443        .and_then(|v| {
444            v.parse()
445                .map_err(|e| MtopError::runtime_cause(format!("field {} value '{}'", key, v), e))
446        })
447}
448
449fn parse_value<T>(val: &str, line: &str) -> Result<T, MtopError>
450where
451    T: FromStr + fmt::Display,
452    <T as FromStr>::Err: fmt::Display + Send + Sync + error::Error + 'static,
453{
454    val.parse()
455        .map_err(|e| MtopError::runtime_cause(format!("parsing {} from '{}'", val, line), e))
456}
457
458#[derive(Debug, PartialOrd, PartialEq, Copy, Clone)]
459pub enum ErrorKind {
460    Runtime,
461    IO,
462    Protocol,
463    Configuration,
464}
465
466impl fmt::Display for ErrorKind {
467    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
468        match self {
469            Self::Runtime => write!(f, "runtime error"),
470            Self::IO => write!(f, "io error"),
471            Self::Protocol => write!(f, "protocol error"),
472            Self::Configuration => write!(f, "configuration error"),
473        }
474    }
475}
476
477#[derive(Debug)]
478enum ErrorRepr {
479    Message(String),
480    Cause(Box<dyn error::Error + Send + Sync + 'static>),
481    MessageCause(String, Box<dyn error::Error + Send + Sync + 'static>),
482}
483
484#[derive(Debug)]
485pub struct MtopError {
486    kind: ErrorKind,
487    repr: ErrorRepr,
488}
489
490impl MtopError {
491    pub fn runtime<S>(msg: S) -> MtopError
492    where
493        S: Into<String>,
494    {
495        MtopError {
496            kind: ErrorKind::Runtime,
497            repr: ErrorRepr::Message(msg.into()),
498        }
499    }
500
501    pub fn runtime_cause<S, E>(msg: S, e: E) -> MtopError
502    where
503        S: Into<String>,
504        E: error::Error + Send + Sync + 'static,
505    {
506        MtopError {
507            kind: ErrorKind::Runtime,
508            repr: ErrorRepr::MessageCause(msg.into(), Box::new(e)),
509        }
510    }
511
512    pub fn configuration<S>(msg: S) -> MtopError
513    where
514        S: Into<String>,
515    {
516        MtopError {
517            kind: ErrorKind::Configuration,
518            repr: ErrorRepr::Message(msg.into()),
519        }
520    }
521
522    pub fn configuration_cause<S, E>(msg: S, e: E) -> MtopError
523    where
524        S: Into<String>,
525        E: error::Error + Send + Sync + 'static,
526    {
527        MtopError {
528            kind: ErrorKind::Configuration,
529            repr: ErrorRepr::MessageCause(msg.into(), Box::new(e)),
530        }
531    }
532
533    pub fn timeout<D>(t: Duration, operation: D) -> MtopError
534    where
535        D: fmt::Display,
536    {
537        MtopError {
538            kind: ErrorKind::IO,
539            repr: ErrorRepr::Message(format!("operation {} timed out after {:?}", operation, t)),
540        }
541    }
542
543    pub fn kind(&self) -> ErrorKind {
544        self.kind
545    }
546}
547
548impl fmt::Display for MtopError {
549    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
550        match &self.repr {
551            ErrorRepr::Message(msg) => write!(f, "{}: {}", self.kind, msg),
552            ErrorRepr::Cause(e) => write!(f, "{}: {}", self.kind, e),
553            ErrorRepr::MessageCause(msg, e) => write!(f, "{}: {}: {}", self.kind, msg, e),
554        }
555    }
556}
557
558impl error::Error for MtopError {
559    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
560        match &self.repr {
561            ErrorRepr::Message(_) => None,
562            ErrorRepr::Cause(e) => Some(e.as_ref()),
563            ErrorRepr::MessageCause(_, e) => Some(e.as_ref()),
564        }
565    }
566}
567
568impl From<(String, io::Error)> for MtopError {
569    fn from((s, e): (String, io::Error)) -> Self {
570        MtopError {
571            kind: ErrorKind::IO,
572            repr: ErrorRepr::MessageCause(s, Box::new(e)),
573        }
574    }
575}
576
577impl From<io::Error> for MtopError {
578    fn from(e: io::Error) -> Self {
579        MtopError {
580            kind: ErrorKind::IO,
581            repr: ErrorRepr::Cause(Box::new(e)),
582        }
583    }
584}
585
586impl From<ProtocolError> for MtopError {
587    fn from(e: ProtocolError) -> Self {
588        MtopError {
589            kind: ErrorKind::Protocol,
590            repr: ErrorRepr::Cause(Box::new(e)),
591        }
592    }
593}
594
595#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
596pub enum ProtocolErrorKind {
597    BadClass,
598    Busy,
599    Client,
600    NotFound,
601    NotStored,
602    Server,
603    Syntax,
604}
605
606impl fmt::Display for ProtocolErrorKind {
607    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
608        match self {
609            Self::BadClass => "BADCLASS".fmt(f),
610            Self::Busy => "BUSY".fmt(f),
611            Self::Client => "CLIENT_ERROR".fmt(f),
612            Self::NotFound => "NOT_FOUND".fmt(f),
613            Self::NotStored => "NOT_STORED".fmt(f),
614            Self::Server => "SERVER_ERROR".fmt(f),
615            Self::Syntax => "ERROR".fmt(f),
616        }
617    }
618}
619
620#[derive(Debug)]
621pub struct ProtocolError {
622    kind: ProtocolErrorKind,
623    message: Option<String>,
624}
625
626impl ProtocolError {
627    pub fn kind(&self) -> ProtocolErrorKind {
628        self.kind
629    }
630}
631
632impl fmt::Display for ProtocolError {
633    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
634        if let Some(msg) = &self.message {
635            write!(f, "{} {}", self.kind, msg)
636        } else {
637            write!(f, "{}", self.kind)
638        }
639    }
640}
641
642impl error::Error for ProtocolError {}
643
644#[derive(Debug, Eq, PartialEq, Clone)]
645enum Command<'a> {
646    Add(&'a Key, u64, u32, &'a [u8]),
647    CrawlerMetadump,
648    Decr(&'a Key, u64),
649    Delete(&'a Key),
650    FlushAll,
651    FlushAllWait(u64),
652    Gets(&'a [Key]),
653    Incr(&'a Key, u64),
654    Replace(&'a Key, u64, u32, &'a [u8]),
655    Stats,
656    StatsItems,
657    StatsSlabs,
658    Set(&'a Key, u64, u32, &'a [u8]),
659    Touch(&'a Key, u32),
660    Version,
661}
662
663impl<'a> From<Command<'a>> for Vec<u8> {
664    fn from(value: Command<'a>) -> Self {
665        match value {
666            Command::Add(key, flags, ttl, data) => storage_command("add", key, flags, ttl, data),
667            Command::CrawlerMetadump => "lru_crawler metadump hash\r\n".to_owned().into_bytes(),
668            Command::Decr(key, delta) => format!("decr {} {}\r\n", key, delta).into_bytes(),
669            Command::Delete(key) => format!("delete {}\r\n", key).into_bytes(),
670            Command::FlushAll => "flush_all\r\n".to_owned().into_bytes(),
671            Command::FlushAllWait(wait) => format!("flush_all {}\r\n", wait).into_bytes(),
672            Command::Gets(keys) => format!("gets {}\r\n", keys.join(" ")).into_bytes(),
673            Command::Incr(key, delta) => format!("incr {} {}\r\n", key, delta).into_bytes(),
674            Command::Replace(key, flags, ttl, data) => storage_command("replace", key, flags, ttl, data),
675            Command::Stats => "stats\r\n".to_owned().into_bytes(),
676            Command::StatsItems => "stats items\r\n".to_owned().into_bytes(),
677            Command::StatsSlabs => "stats slabs\r\n".to_owned().into_bytes(),
678            Command::Set(key, flags, ttl, data) => storage_command("set", key, flags, ttl, data),
679            Command::Touch(key, ttl) => format!("touch {} {}\r\n", key, ttl).into_bytes(),
680            Command::Version => "version\r\n".to_owned().into_bytes(),
681        }
682    }
683}
684
685fn storage_command(verb: &str, key: &Key, flags: u64, ttl: u32, data: &[u8]) -> Vec<u8> {
686    let mut bytes = Vec::with_capacity(key.len() + data.len() + 32);
687    io::Write::write_all(
688        &mut bytes,
689        format!("{} {} {} {} {}\r\n", verb, key, flags, ttl, data.len()).as_bytes(),
690    )
691    .unwrap();
692    io::Write::write_all(&mut bytes, data).unwrap();
693    io::Write::write_all(&mut bytes, "\r\n".as_bytes()).unwrap();
694    bytes
695}
696
697pub struct Memcached {
698    read: Lines<BufReader<Box<dyn AsyncRead + Send + Sync + Unpin>>>,
699    write: BufWriter<Box<dyn AsyncWrite + Send + Sync + Unpin>>,
700}
701
702impl Memcached {
703    const MAX_PAYLOAD_SIZE: u64 = 1024 * 1024 * 1024;
704
705    pub fn new<R, W>(read: R, write: W) -> Self
706    where
707        R: AsyncRead + Send + Sync + Unpin + 'static,
708        W: AsyncWrite + Send + Sync + Unpin + 'static,
709    {
710        Memcached {
711            read: BufReader::<Box<dyn AsyncRead + Send + Sync + Unpin>>::new(Box::new(read)).lines(),
712            write: BufWriter::new(Box::new(write)),
713        }
714    }
715
716    /// Get a `Stats` object with the current values of the interesting stats for the server.
717    pub async fn stats(&mut self) -> Result<Stats, MtopError> {
718        self.send(Command::Stats).await?;
719        let raw = self.read_stats_response().await?;
720        Stats::try_from(&raw)
721    }
722
723    /// Get a `Slabs` object with information about each set of `Slab`s maintained by
724    /// the Memcached server. You can think of each `Slab` as a class of objects that
725    /// are stored together in memory. Note that `Slab` IDs may not be contiguous based
726    /// on the size of items actually stored by the server.
727    pub async fn slabs(&mut self) -> Result<Slabs, MtopError> {
728        self.send(Command::StatsSlabs).await?;
729        let raw = self.read_stats_response().await?;
730        Slabs::try_from(&raw)
731    }
732
733    /// Get a `SlabsItems` object with information about the `SlabItem` items stored in
734    /// each slab class maintained by the Memcached server. The ID of each `SlabItem`
735    /// corresponds to a `Slab` maintained by the server. Note that `SlabItem` IDs may
736    /// not be contiguous based on the size of items actually stored by the server.
737    pub async fn items(&mut self) -> Result<SlabItems, MtopError> {
738        self.send(Command::StatsItems).await?;
739        let raw = self.read_stats_response().await?;
740        SlabItems::try_from(&raw)
741    }
742
743    async fn read_stats_response(&mut self) -> Result<HashMap<String, String>, MtopError> {
744        let mut out = HashMap::new();
745
746        while let Some(v) = self.read.next_line().await? {
747            if v == "END" {
748                break;
749            }
750
751            let (key, val) = Self::parse_stat_line(&v)?;
752            out.insert(key.to_owned(), val.to_owned());
753        }
754
755        Ok(out)
756    }
757
758    fn parse_stat_line(line: &str) -> Result<(&str, &str), MtopError> {
759        let mut parts = line.splitn(3, ' ');
760        match (parts.next(), parts.next(), parts.next()) {
761            (Some("STAT"), Some(key), Some(val)) => Ok((key, val)),
762            _ => {
763                if let Some(err) = Self::parse_error(line) {
764                    Err(MtopError::from(err))
765                } else {
766                    Err(MtopError::runtime(format!("unable to parse '{}'", line)))
767                }
768            }
769        }
770    }
771
772    /// Get a `Meta` object for every item in the cache which includes its key and expiration
773    /// time as a UNIX timestamp. Expiration time will be `-1` if the item was set with an
774    /// infinite TTL.
775    pub async fn metas(&mut self) -> Result<Vec<Meta>, MtopError> {
776        self.send(Command::CrawlerMetadump).await?;
777        let mut out = Vec::new();
778        let mut raw = HashMap::new();
779
780        while let Some(v) = self.read.next_line().await? {
781            if v == "END" {
782                break;
783            }
784
785            // Check for an error first because the `metadump` command doesn't
786            // have any sort of prefix for each result line like `STAT` or `VALUE`
787            // so it's hard to know if it's valid without looking for an error.
788            if let Some(err) = Self::parse_error(&v) {
789                return Err(MtopError::from(err));
790            }
791
792            let item = Self::parse_crawler_meta(&v, Meta::KEYS, &mut raw)?;
793            out.push(item);
794        }
795
796        Ok(out)
797    }
798
799    fn parse_crawler_meta(line: &str, keys: &[&str], raw: &mut HashMap<String, String>) -> Result<Meta, MtopError> {
800        // Avoid allocating a new HashMap to parse every meta entry just to throw it away
801        raw.clear();
802
803        for p in line.split(' ') {
804            let (key, val) = p
805                .split_once('=')
806                .ok_or_else(|| MtopError::runtime(format!("unexpected metadump format '{}'", line)))?;
807
808            // Avoid spending time decoding values or allocating for data we don't care about.
809            // Use a slice here since it's faster than a HashSet when the number of entries is
810            // small and the number of keys we're searching through is always small.
811            if !keys.contains(&key) {
812                continue;
813            }
814
815            let decoded = urlencoding::decode(val)
816                .map_err(|e| MtopError::runtime_cause(format!("unexpected metadump encoding '{}'", line), e))?;
817            raw.insert(key.to_owned(), decoded.into_owned());
818        }
819
820        Meta::try_from(raw.deref())
821    }
822
823    /// Send a simple command to verify our connection to the server is working.
824    pub async fn ping(&mut self) -> Result<(), MtopError> {
825        self.send(Command::Version).await?;
826        if let Some(v) = self.read.next_line().await? {
827            if let Some(e) = Self::parse_error(&v) {
828                return Err(MtopError::from(e));
829            }
830            if !v.starts_with("VERSION") {
831                return Err(MtopError::runtime(format!("unable to parse '{}'", v)));
832            }
833        }
834
835        Ok(())
836    }
837
838    /// Flush all entries in the cache, optionally after a delay. When a delay is used, the
839    /// server will flush entries after a delay but the call will still return immediately.
840    pub async fn flush_all(&mut self, wait: Option<Duration>) -> Result<(), MtopError> {
841        let cmd = if let Some(d) = wait {
842            Command::FlushAllWait(d.as_secs())
843        } else {
844            Command::FlushAll
845        };
846
847        self.send(cmd).await?;
848        self.read_simple_response("OK").await
849    }
850
851    /// Get a map of the requested keys and their corresponding `Value` in the cache
852    /// including the key, flags, and data.
853    pub async fn get(&mut self, keys: &[Key]) -> Result<HashMap<String, Value>, MtopError> {
854        self.send(Command::Gets(keys)).await?;
855        let mut out = HashMap::with_capacity(keys.len());
856
857        while let Some(v) = self.read.next_line().await? {
858            if v == "END" {
859                break;
860            }
861
862            let value = self.parse_gets_value(&v).await?;
863            out.insert(value.key.clone(), value);
864        }
865
866        Ok(out)
867    }
868
869    async fn parse_gets_value(&mut self, line: &str) -> Result<Value, MtopError> {
870        let mut parts = line.splitn(5, ' ');
871
872        match (parts.next(), parts.next(), parts.next(), parts.next(), parts.next()) {
873            (Some("VALUE"), Some(k), Some(flags), Some(len), Some(cas)) => {
874                let flags: u64 = parse_value(flags, line)?;
875                let len: u64 = parse_value(len, line)?;
876                let cas: u64 = parse_value(cas, line)?;
877
878                // The max size of an object in Memcached is represented with a `u64` which
879                // means it's basically infinite. In practice the default max size of an object
880                // in a Memcached server is 1MB but can be configured higher. Place a limit on
881                // the size that we'll accept here to avoid a denial of service from bad lengths.
882                if len > Self::MAX_PAYLOAD_SIZE {
883                    return Err(MtopError::runtime(format!(
884                        "server response of length {} exceeds client max of {}",
885                        len,
886                        Self::MAX_PAYLOAD_SIZE
887                    )));
888                }
889
890                // Two extra bytes to read the trailing \r\n but then truncate them.
891                let mut data = Vec::with_capacity(len as usize + 2);
892                let reader = self.read.get_mut();
893                reader.take(len + 2).read_to_end(&mut data).await?;
894                data.truncate(len as usize);
895
896                Ok(Value {
897                    key: k.to_owned(),
898                    flags,
899                    cas,
900                    data,
901                })
902            }
903            _ => {
904                // Response doesn't look like a `VALUE` line, see if the server has
905                // responded with an error that we can parse. Otherwise, consider this
906                // an internal error.
907                if let Some(err) = Self::parse_error(line) {
908                    Err(MtopError::from(err))
909                } else {
910                    Err(MtopError::runtime(format!("unable to parse '{}'", line)))
911                }
912            }
913        }
914    }
915
916    /// Increment the value of a key by the given delta if the value is numeric returning
917    /// the new value. Returns an error if the value is _not_ numeric.
918    pub async fn incr(&mut self, key: &Key, delta: u64) -> Result<u64, MtopError> {
919        self.send(Command::Incr(key, delta)).await?;
920        if let Some(v) = self.read.next_line().await? {
921            Self::parse_numeric_response(&v)
922        } else {
923            Err(MtopError::runtime("unexpected empty response"))
924        }
925    }
926
927    /// Decrement the value of a key by the given delta if the value is numeric returning
928    /// the new value with a minimum of 0. Returns an error if the value is _not_ numeric.
929    pub async fn decr(&mut self, key: &Key, delta: u64) -> Result<u64, MtopError> {
930        self.send(Command::Decr(key, delta)).await?;
931        if let Some(v) = self.read.next_line().await? {
932            Self::parse_numeric_response(&v)
933        } else {
934            Err(MtopError::runtime("unexpected empty response"))
935        }
936    }
937
938    fn parse_numeric_response(line: &str) -> Result<u64, MtopError> {
939        if let Some(err) = Self::parse_error(line) {
940            Err(MtopError::from(err))
941        } else {
942            line.parse()
943                .map_err(|_e| MtopError::runtime(format!("unable to parse '{}'", line)))
944        }
945    }
946
947    /// Store the provided item in the cache, regardless of whether it already exists.
948    pub async fn set<V>(&mut self, key: &Key, flags: u64, ttl: u32, data: V) -> Result<(), MtopError>
949    where
950        V: AsRef<[u8]>,
951    {
952        self.send(Command::Set(key, flags, ttl, data.as_ref())).await?;
953        self.read_simple_response("STORED").await
954    }
955
956    /// Store the provided item in the cache only if it does not already exist.
957    pub async fn add<V>(&mut self, key: &Key, flags: u64, ttl: u32, data: V) -> Result<(), MtopError>
958    where
959        V: AsRef<[u8]>,
960    {
961        self.send(Command::Add(key, flags, ttl, data.as_ref())).await?;
962        self.read_simple_response("STORED").await
963    }
964
965    /// Store the provided item in the cache only if it already exists.
966    pub async fn replace<V>(&mut self, key: &Key, flags: u64, ttl: u32, data: V) -> Result<(), MtopError>
967    where
968        V: AsRef<[u8]>,
969    {
970        self.send(Command::Replace(key, flags, ttl, data.as_ref())).await?;
971        self.read_simple_response("STORED").await
972    }
973
974    /// Update the TTL of an item in the cache if it exists, return an error otherwise.
975    pub async fn touch(&mut self, key: &Key, ttl: u32) -> Result<(), MtopError> {
976        self.send(Command::Touch(key, ttl)).await?;
977        self.read_simple_response("TOUCHED").await
978    }
979
980    /// Delete an item in the cache if it exists, return an error otherwise.
981    pub async fn delete(&mut self, key: &Key) -> Result<(), MtopError> {
982        self.send(Command::Delete(key)).await?;
983        self.read_simple_response("DELETED").await
984    }
985
986    async fn read_simple_response(&mut self, expected: &str) -> Result<(), MtopError> {
987        match self.read.next_line().await? {
988            Some(line) if line == expected => Ok(()),
989            Some(line) => {
990                if let Some(err) = Self::parse_error(&line) {
991                    Err(MtopError::from(err))
992                } else {
993                    Err(MtopError::runtime(format!("unable to parse '{}'", line)))
994                }
995            }
996            None => Err(MtopError::runtime("unexpected empty response")),
997        }
998    }
999
1000    fn parse_error(line: &str) -> Option<ProtocolError> {
1001        let mut values = line.splitn(2, ' ');
1002        let (kind, message) = match (values.next(), values.next()) {
1003            (Some("BADCLASS"), Some(msg)) => (ProtocolErrorKind::BadClass, Some(msg.to_owned())),
1004            (Some("BUSY"), Some(msg)) => (ProtocolErrorKind::Busy, Some(msg.to_owned())),
1005            (Some("CLIENT_ERROR"), Some(msg)) => (ProtocolErrorKind::Client, Some(msg.to_owned())),
1006            (Some("ERROR"), None) => (ProtocolErrorKind::Syntax, None),
1007            (Some("ERROR"), Some(msg)) => (ProtocolErrorKind::Syntax, Some(msg.to_owned())),
1008            (Some("NOT_FOUND"), None) => (ProtocolErrorKind::NotFound, None),
1009            (Some("NOT_STORED"), None) => (ProtocolErrorKind::NotStored, None),
1010            (Some("SERVER_ERROR"), Some(msg)) => (ProtocolErrorKind::Server, Some(msg.to_owned())),
1011
1012            _ => return None,
1013        };
1014
1015        Some(ProtocolError { kind, message })
1016    }
1017
1018    async fn send(&mut self, cmd: Command<'_>) -> Result<(), MtopError> {
1019        let cmd_bytes: Vec<u8> = cmd.into();
1020        self.write.write_all(&cmd_bytes).await?;
1021        Ok(self.write.flush().await?)
1022    }
1023}
1024
1025impl fmt::Debug for Memcached {
1026    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1027        write!(f, "Memcached {{ read: <...>, write: <...> }}")
1028    }
1029}
1030
1031#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1032#[repr(transparent)]
1033pub struct Key(String);
1034
1035impl Key {
1036    const MAX_LENGTH: usize = 250;
1037
1038    pub fn one<T>(val: T) -> Result<Key, MtopError>
1039    where
1040        T: Into<String>,
1041    {
1042        let val = val.into();
1043        if !Self::is_legal_val(&val) {
1044            Err(MtopError::runtime(format!("invalid key {}", val)))
1045        } else {
1046            Ok(Key(val))
1047        }
1048    }
1049
1050    pub fn many<I, T>(vals: I) -> Result<Vec<Key>, MtopError>
1051    where
1052        I: IntoIterator<Item = T>,
1053        T: Into<String>,
1054    {
1055        let iter = vals.into_iter();
1056        let (sz, _) = iter.size_hint();
1057        let mut out = Vec::with_capacity(sz);
1058
1059        for val in iter {
1060            out.push(Self::one(val)?);
1061        }
1062
1063        Ok(out)
1064    }
1065
1066    pub fn len(&self) -> usize {
1067        self.0.len()
1068    }
1069
1070    pub fn is_empty(&self) -> bool {
1071        self.0.is_empty()
1072    }
1073
1074    fn is_legal_val(val: &str) -> bool {
1075        if val.len() > Self::MAX_LENGTH {
1076            return false;
1077        }
1078
1079        for c in val.chars() {
1080            if !c.is_ascii() || c.is_ascii_whitespace() || c.is_ascii_control() {
1081                return false;
1082            }
1083        }
1084
1085        true
1086    }
1087}
1088
1089impl fmt::Display for Key {
1090    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1091        self.0.fmt(f)
1092    }
1093}
1094
1095impl AsRef<str> for Key {
1096    fn as_ref(&self) -> &str {
1097        &self.0
1098    }
1099}
1100
1101impl Borrow<str> for Key {
1102    fn borrow(&self) -> &str {
1103        &self.0
1104    }
1105}
1106
1107#[cfg(test)]
1108mod test {
1109    use super::{ErrorKind, Key, Memcached, Meta, Slab, SlabItem, SlabItems};
1110    use std::io::{Cursor, Error};
1111    use std::pin::Pin;
1112    use std::task::{Context, Poll};
1113    use std::time::Duration;
1114    use tokio::io::AsyncWrite;
1115    use tokio::sync::mpsc::{self, UnboundedSender};
1116
1117    /////////
1118    // key //
1119    /////////
1120
1121    #[test]
1122    fn test_key_one_length() {
1123        let val = "abc".repeat(Key::MAX_LENGTH);
1124        let res = Key::one(val);
1125        assert!(res.is_err());
1126    }
1127
1128    #[test]
1129    fn test_key_one_non_ascii() {
1130        let val = "🤦";
1131        let res = Key::one(val);
1132        assert!(res.is_err());
1133    }
1134
1135    #[test]
1136    fn test_key_one_whitespace() {
1137        let val = "some thing";
1138        let res = Key::one(val);
1139        assert!(res.is_err());
1140    }
1141
1142    #[test]
1143    fn test_key_one_control_char() {
1144        let val = "\x7F";
1145        let res = Key::one(val);
1146        assert!(res.is_err());
1147    }
1148
1149    #[test]
1150    fn test_key_one_success() {
1151        let val = "a-reasonable-key";
1152        let res = Key::one(val);
1153        assert!(res.is_ok());
1154    }
1155
1156    struct WriteAdapter {
1157        tx: UnboundedSender<Vec<u8>>,
1158    }
1159
1160    impl AsyncWrite for WriteAdapter {
1161        fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, Error>> {
1162            self.tx.send(buf.to_owned()).unwrap();
1163            Poll::Ready(Ok(buf.len()))
1164        }
1165
1166        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1167            Poll::Ready(Ok(()))
1168        }
1169
1170        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1171            Poll::Ready(Ok(()))
1172        }
1173    }
1174
1175    /// Create a new receiver channel and `Memcached` instance to read the provided server
1176    /// response. Anything written by the client is able to be read from the receiver channel.
1177    /// NOTE that it is important that the receiver not be dropped by the caller since this will
1178    /// cause writes to the channel to fail from within the client.
1179    macro_rules! client {
1180        () => ({
1181            let (tx, rx) = mpsc::unbounded_channel();
1182            let reads = Vec::new();
1183            (rx, Memcached::new(Cursor::new(reads), WriteAdapter { tx }))
1184        });
1185        ($($line:expr),+ $(,)?) => ({
1186            let (tx, rx) = mpsc::unbounded_channel();
1187            let mut reads = Vec::new();
1188            $(reads.extend_from_slice($line.as_bytes());)+
1189            (rx, Memcached::new(Cursor::new(reads), WriteAdapter { tx }))
1190        })
1191    }
1192
1193    /////////
1194    // get //
1195    /////////
1196
1197    #[tokio::test]
1198    async fn test_memcached_get_no_key() {
1199        let (_rx, mut client) = client!();
1200        let vals: Vec<String> = vec![];
1201        let keys = Key::many(vals).unwrap();
1202        let res = client.get(&keys).await.unwrap();
1203
1204        assert!(res.is_empty());
1205    }
1206
1207    #[tokio::test]
1208    async fn test_memcached_get_error() {
1209        let (_rx, mut client) = client!("SERVER_ERROR backend failure\r\n");
1210        let keys = Key::many(vec!["foo", "baz"]).unwrap();
1211        let res = client.get(&keys).await;
1212
1213        assert!(res.is_err());
1214        let err = res.unwrap_err();
1215        assert_eq!(ErrorKind::Protocol, err.kind());
1216    }
1217
1218    #[tokio::test]
1219    async fn test_memcached_get_miss() {
1220        let (_rx, mut client) = client!("END\r\n");
1221        let keys = Key::many(vec!["foo", "baz"]).unwrap();
1222        let res = client.get(&keys).await.unwrap();
1223
1224        assert!(res.is_empty());
1225    }
1226
1227    #[tokio::test]
1228    async fn test_memcached_get_hit() {
1229        let (_rx, mut client) = client!(
1230            "VALUE foo 32 3 1\r\n",
1231            "bar\r\n",
1232            "VALUE baz 64 3 2\r\n",
1233            "qux\r\n",
1234            "END\r\n",
1235        );
1236        let keys = Key::many(vec!["foo", "baz"]).unwrap();
1237        let res = client.get(&keys).await.unwrap();
1238
1239        let val1 = res.get("foo").unwrap();
1240        assert_eq!("foo", val1.key);
1241        assert_eq!("bar".as_bytes(), val1.data);
1242        assert_eq!(32, val1.flags);
1243        assert_eq!(1, val1.cas);
1244
1245        let val2 = res.get("baz").unwrap();
1246        assert_eq!("baz", val2.key);
1247        assert_eq!("qux".as_bytes(), val2.data);
1248        assert_eq!(64, val2.flags);
1249        assert_eq!(2, val2.cas);
1250    }
1251
1252    //////////
1253    // incr //
1254    //////////
1255
1256    #[tokio::test]
1257    async fn test_memcached_incr_bad_val() {
1258        let (mut rx, mut client) = client!("CLIENT_ERROR cannot increment or decrement non-numeric value\r\n");
1259        let key = Key::one("test").unwrap();
1260        let res = client.incr(&key, 2).await;
1261
1262        assert!(res.is_err());
1263        let err = res.unwrap_err();
1264        assert_eq!(ErrorKind::Protocol, err.kind());
1265
1266        let bytes = rx.recv().await.unwrap();
1267        let command = String::from_utf8(bytes).unwrap();
1268        assert_eq!("incr test 2\r\n", command);
1269    }
1270
1271    #[tokio::test]
1272    async fn test_memcached_incr_success() {
1273        let (mut rx, mut client) = client!("3\r\n");
1274        let key = Key::one("test").unwrap();
1275        let res = client.incr(&key, 2).await.unwrap();
1276
1277        assert_eq!(3, res);
1278        let bytes = rx.recv().await.unwrap();
1279        let command = String::from_utf8(bytes).unwrap();
1280        assert_eq!("incr test 2\r\n", command);
1281    }
1282
1283    //////////
1284    // decr //
1285    //////////
1286
1287    #[tokio::test]
1288    async fn test_memcached_decr_bad_val() {
1289        let (mut rx, mut client) = client!("CLIENT_ERROR cannot increment or decrement non-numeric value\r\n");
1290        let key = Key::one("test").unwrap();
1291        let res = client.decr(&key, 1).await;
1292
1293        assert!(res.is_err());
1294        let err = res.unwrap_err();
1295        assert_eq!(ErrorKind::Protocol, err.kind());
1296
1297        let bytes = rx.recv().await.unwrap();
1298        let command = String::from_utf8(bytes).unwrap();
1299        assert_eq!("decr test 1\r\n", command);
1300    }
1301
1302    #[tokio::test]
1303    async fn test_memcached_decr_success() {
1304        let (mut rx, mut client) = client!("3\r\n");
1305        let key = Key::one("test").unwrap();
1306        let res = client.decr(&key, 1).await.unwrap();
1307
1308        assert_eq!(3, res);
1309        let bytes = rx.recv().await.unwrap();
1310        let command = String::from_utf8(bytes).unwrap();
1311        assert_eq!("decr test 1\r\n", command);
1312    }
1313
1314    //////////
1315    // ping //
1316    //////////
1317
1318    #[tokio::test]
1319    async fn test_memcached_ping_bad_val() {
1320        let (mut rx, mut client) = client!("220 localhost ESMTP Postfix\r\n");
1321        let res = client.ping().await;
1322
1323        assert!(res.is_err());
1324        let err = res.unwrap_err();
1325        assert_eq!(ErrorKind::Runtime, err.kind());
1326
1327        let bytes = rx.recv().await.unwrap();
1328        let command = String::from_utf8(bytes).unwrap();
1329        assert_eq!("version\r\n", command);
1330    }
1331
1332    #[tokio::test]
1333    async fn test_memcached_ping_success() {
1334        let (mut rx, mut client) = client!("VERSION 1.6.22\r\n");
1335        client.ping().await.unwrap();
1336
1337        let bytes = rx.recv().await.unwrap();
1338        let command = String::from_utf8(bytes).unwrap();
1339        assert_eq!("version\r\n", command);
1340    }
1341
1342    macro_rules! test_store_command_success {
1343        ($method:ident, $verb:expr) => {
1344            let (mut rx, mut client) = client!("STORED\r\n");
1345            let res = client.$method(&Key::one("test").unwrap(), 0, 300, "val".as_bytes()).await;
1346
1347            assert!(res.is_ok());
1348            let bytes = rx.recv().await.unwrap();
1349            let command = String::from_utf8(bytes).unwrap();
1350            assert_eq!(concat!($verb, " test 0 300 3\r\nval\r\n"), command);
1351        };
1352    }
1353
1354    macro_rules! test_store_command_error {
1355        ($method:ident, $verb:expr) => {
1356            let (mut rx, mut client) = client!("NOT_STORED\r\n");
1357            let res = client.$method(&Key::one("test").unwrap(), 0, 300, "val".as_bytes()).await;
1358
1359            assert!(res.is_err());
1360            let err = res.unwrap_err();
1361            assert_eq!(ErrorKind::Protocol, err.kind());
1362
1363            let bytes = rx.recv().await.unwrap();
1364            let command = String::from_utf8(bytes).unwrap();
1365            assert_eq!(concat!($verb, " test 0 300 3\r\nval\r\n"), command);
1366        };
1367    }
1368
1369    /////////
1370    // set //
1371    /////////
1372
1373    #[tokio::test]
1374    async fn test_memcached_set_success() {
1375        test_store_command_success!(set, "set");
1376    }
1377
1378    #[tokio::test]
1379    async fn test_memcached_set_error() {
1380        test_store_command_error!(set, "set");
1381    }
1382
1383    /////////
1384    // add //
1385    /////////
1386
1387    #[tokio::test]
1388    async fn test_memcached_add_success() {
1389        test_store_command_success!(add, "add");
1390    }
1391
1392    #[tokio::test]
1393    async fn test_memcached_add_error() {
1394        test_store_command_error!(add, "add");
1395    }
1396
1397    /////////////
1398    // replace //
1399    /////////////
1400
1401    #[tokio::test]
1402    async fn test_memcached_replace_success() {
1403        test_store_command_success!(replace, "replace");
1404    }
1405
1406    #[tokio::test]
1407    async fn test_memcached_replace_error() {
1408        test_store_command_error!(replace, "replace");
1409    }
1410
1411    ///////////
1412    // touch //
1413    ///////////
1414
1415    #[tokio::test]
1416    async fn test_memcached_touch_success() {
1417        let (mut rx, mut client) = client!("TOUCHED\r\n");
1418        let key = Key::one("test").unwrap();
1419        let res = client.touch(&key, 300).await;
1420
1421        assert!(res.is_ok());
1422        let bytes = rx.recv().await.unwrap();
1423        let command = String::from_utf8(bytes).unwrap();
1424        assert_eq!("touch test 300\r\n", command);
1425    }
1426
1427    #[tokio::test]
1428    async fn test_memcached_touch_error() {
1429        let (mut rx, mut client) = client!("NOT_FOUND\r\n");
1430        let key = Key::one("test").unwrap();
1431        let res = client.touch(&key, 300).await;
1432
1433        assert!(res.is_err());
1434        let err = res.unwrap_err();
1435        assert_eq!(ErrorKind::Protocol, err.kind());
1436
1437        let bytes = rx.recv().await.unwrap();
1438        let command = String::from_utf8(bytes).unwrap();
1439        assert_eq!("touch test 300\r\n", command);
1440    }
1441
1442    ////////////
1443    // delete //
1444    ////////////
1445
1446    #[tokio::test]
1447    async fn test_memcached_delete_success() {
1448        let (mut rx, mut client) = client!("DELETED\r\n");
1449        let key = Key::one("test").unwrap();
1450        let res = client.delete(&key).await;
1451
1452        assert!(res.is_ok());
1453        let bytes = rx.recv().await.unwrap();
1454        let command = String::from_utf8(bytes).unwrap();
1455        assert_eq!("delete test\r\n", command);
1456    }
1457
1458    #[tokio::test]
1459    async fn test_memcached_delete_error() {
1460        let (mut rx, mut client) = client!("NOT_FOUND\r\n");
1461        let key = Key::one("test").unwrap();
1462        let res = client.delete(&key).await;
1463
1464        assert!(res.is_err());
1465        let err = res.unwrap_err();
1466        assert_eq!(ErrorKind::Protocol, err.kind());
1467
1468        let bytes = rx.recv().await.unwrap();
1469        let command = String::from_utf8(bytes).unwrap();
1470        assert_eq!("delete test\r\n", command);
1471    }
1472
1473    ///////////////
1474    // flush_all //
1475    ///////////////
1476
1477    #[tokio::test]
1478    async fn test_memcached_flush_all_with_wait_success() {
1479        let (mut rx, mut client) = client!("OK\r\n");
1480        let wait = Some(Duration::from_secs(25));
1481        let res = client.flush_all(wait).await;
1482
1483        assert!(res.is_ok());
1484        let bytes = rx.recv().await.unwrap();
1485        let command = String::from_utf8(bytes).unwrap();
1486        assert_eq!("flush_all 25\r\n", command);
1487    }
1488
1489    #[tokio::test]
1490    async fn test_memcached_flush_all_with_wait_error() {
1491        let (mut rx, mut client) = client!("ERROR\r\n");
1492        let wait = Some(Duration::from_secs(25));
1493        let res = client.flush_all(wait).await;
1494
1495        assert!(res.is_err());
1496        let err = res.unwrap_err();
1497        assert_eq!(ErrorKind::Protocol, err.kind());
1498
1499        let bytes = rx.recv().await.unwrap();
1500        let command = String::from_utf8(bytes).unwrap();
1501        assert_eq!("flush_all 25\r\n", command);
1502    }
1503
1504    #[tokio::test]
1505    async fn test_memcached_flush_all_no_wait_success() {
1506        let (mut rx, mut client) = client!("OK\r\n");
1507        let res = client.flush_all(None).await;
1508
1509        assert!(res.is_ok());
1510        let bytes = rx.recv().await.unwrap();
1511        let command = String::from_utf8(bytes).unwrap();
1512        assert_eq!("flush_all\r\n", command);
1513    }
1514
1515    #[tokio::test]
1516    async fn test_memcached_flush_all_no_wait_error() {
1517        let (mut rx, mut client) = client!("ERROR\r\n");
1518        let res = client.flush_all(None).await;
1519
1520        assert!(res.is_err());
1521        let err = res.unwrap_err();
1522        assert_eq!(ErrorKind::Protocol, err.kind());
1523
1524        let bytes = rx.recv().await.unwrap();
1525        let command = String::from_utf8(bytes).unwrap();
1526        assert_eq!("flush_all\r\n", command);
1527    }
1528
1529    ///////////
1530    // stats //
1531    ///////////
1532
1533    #[tokio::test]
1534    async fn test_memcached_stats_empty() {
1535        let (_rx, mut client) = client!("END\r\n");
1536        let res = client.stats().await;
1537
1538        assert!(res.is_err());
1539        let err = res.unwrap_err();
1540        assert_eq!(ErrorKind::Runtime, err.kind());
1541    }
1542
1543    #[tokio::test]
1544    async fn test_memcached_stats_error() {
1545        let (_rx, mut client) = client!("SERVER_ERROR backend failure\r\n");
1546        let res = client.stats().await;
1547
1548        assert!(res.is_err());
1549        let err = res.unwrap_err();
1550        assert_eq!(ErrorKind::Protocol, err.kind());
1551    }
1552
1553    #[tokio::test]
1554    async fn test_memcached_stats_success() {
1555        let (_rx, mut client) = client!(
1556            "STAT pid 1525\r\n",
1557            "STAT uptime 271984\r\n",
1558            "STAT time 1687212809\r\n",
1559            "STAT version 1.6.14\r\n",
1560            "STAT libevent 2.1.12-stable\r\n",
1561            "STAT pointer_size 64\r\n",
1562            "STAT rusage_user 17.544323\r\n",
1563            "STAT rusage_system 11.830461\r\n",
1564            "STAT max_connections 1024\r\n",
1565            "STAT curr_connections 1\r\n",
1566            "STAT total_connections 3\r\n",
1567            "STAT rejected_connections 0\r\n",
1568            "STAT connection_structures 2\r\n",
1569            "STAT response_obj_oom 0\r\n",
1570            "STAT response_obj_count 1\r\n",
1571            "STAT response_obj_bytes 32768\r\n",
1572            "STAT read_buf_count 4\r\n",
1573            "STAT read_buf_bytes 65536\r\n",
1574            "STAT read_buf_bytes_free 16384\r\n",
1575            "STAT read_buf_oom 0\r\n",
1576            "STAT reserved_fds 20\r\n",
1577            "STAT cmd_get 1\r\n",
1578            "STAT cmd_set 0\r\n",
1579            "STAT cmd_flush 0\r\n",
1580            "STAT cmd_touch 0\r\n",
1581            "STAT cmd_meta 0\r\n",
1582            "STAT get_hits 0\r\n",
1583            "STAT get_misses 1\r\n",
1584            "STAT get_expired 0\r\n",
1585            "STAT get_flushed 0\r\n",
1586            "STAT delete_misses 0\r\n",
1587            "STAT delete_hits 0\r\n",
1588            "STAT incr_misses 0\r\n",
1589            "STAT incr_hits 0\r\n",
1590            "STAT decr_misses 0\r\n",
1591            "STAT decr_hits 0\r\n",
1592            "STAT cas_misses 0\r\n",
1593            "STAT cas_hits 0\r\n",
1594            "STAT cas_badval 0\r\n",
1595            "STAT touch_hits 0\r\n",
1596            "STAT touch_misses 0\r\n",
1597            "STAT store_too_large 0\r\n",
1598            "STAT store_no_memory 0\r\n",
1599            "STAT auth_cmds 0\r\n",
1600            "STAT auth_errors 0\r\n",
1601            "STAT bytes_read 16\r\n",
1602            "STAT bytes_written 7\r\n",
1603            "STAT limit_maxbytes 67108864\r\n",
1604            "STAT accepting_conns 1\r\n",
1605            "STAT listen_disabled_num 0\r\n",
1606            "STAT time_in_listen_disabled_us 0\r\n",
1607            "STAT threads 4\r\n",
1608            "STAT conn_yields 0\r\n",
1609            "STAT hash_power_level 16\r\n",
1610            "STAT hash_bytes 524288\r\n",
1611            "STAT hash_is_expanding 0\r\n",
1612            "STAT slab_reassign_rescues 0\r\n",
1613            "STAT slab_reassign_chunk_rescues 0\r\n",
1614            "STAT slab_reassign_evictions_nomem 0\r\n",
1615            "STAT slab_reassign_inline_reclaim 0\r\n",
1616            "STAT slab_reassign_busy_items 0\r\n",
1617            "STAT slab_reassign_busy_deletes 0\r\n",
1618            "STAT slab_reassign_running 0\r\n",
1619            "STAT slabs_moved 0\r\n",
1620            "STAT lru_crawler_running 0\r\n",
1621            "STAT lru_crawler_starts 105\r\n",
1622            "STAT lru_maintainer_juggles 271976\r\n",
1623            "STAT malloc_fails 0\r\n",
1624            "STAT log_worker_dropped 0\r\n",
1625            "STAT log_worker_written 0\r\n",
1626            "STAT log_watcher_skipped 0\r\n",
1627            "STAT log_watcher_sent 0\r\n",
1628            "STAT log_watchers 0\r\n",
1629            "STAT unexpected_napi_ids 0\r\n",
1630            "STAT round_robin_fallback 0\r\n",
1631            "STAT bytes 0\r\n",
1632            "STAT curr_items 0\r\n",
1633            "STAT total_items 0\r\n",
1634            "STAT slab_global_page_pool 0\r\n",
1635            "STAT expired_unfetched 0\r\n",
1636            "STAT evicted_unfetched 0\r\n",
1637            "STAT evicted_active 0\r\n",
1638            "STAT evictions 0\r\n",
1639            "STAT reclaimed 0\r\n",
1640            "STAT crawler_reclaimed 0\r\n",
1641            "STAT crawler_items_checked 0\r\n",
1642            "STAT lrutail_reflocked 0\r\n",
1643            "STAT moves_to_cold 0\r\n",
1644            "STAT moves_to_warm 0\r\n",
1645            "STAT moves_within_lru 0\r\n",
1646            "STAT direct_reclaims 0\r\n",
1647            "STAT lru_bumps_dropped 0\r\n",
1648            "END\r\n",
1649        );
1650        let res = client.stats().await.unwrap();
1651
1652        assert_eq!(0, res.cmd_set);
1653        assert_eq!(1, res.cmd_get);
1654        assert_eq!(1, res.get_misses);
1655        assert_eq!(0, res.get_hits);
1656    }
1657
1658    ///////////
1659    // slabs //
1660    ///////////
1661
1662    #[tokio::test]
1663    async fn test_memcached_slabs_empty() {
1664        let (_rx, mut client) = client!("STAT active_slabs 0\r\n", "STAT total_malloced 0\r\n", "END\r\n");
1665        let res = client.slabs().await.unwrap();
1666
1667        assert!(res.slabs.is_empty());
1668    }
1669
1670    #[tokio::test]
1671    async fn test_memcached_slabs_error() {
1672        let (_rx, mut client) = client!("ERROR Too many open connections\r\n");
1673        let res = client.slabs().await;
1674
1675        assert!(res.is_err());
1676        let err = res.unwrap_err();
1677        assert_eq!(ErrorKind::Protocol, err.kind());
1678    }
1679
1680    #[tokio::test]
1681    async fn test_memcached_slabs_success() {
1682        let (_rx, mut client) = client!(
1683            "STAT 6:chunk_size 304\r\n",
1684            "STAT 6:chunks_per_page 3449\r\n",
1685            "STAT 6:total_pages 1\r\n",
1686            "STAT 6:total_chunks 3449\r\n",
1687            "STAT 6:used_chunks 1\r\n",
1688            "STAT 6:free_chunks 3448\r\n",
1689            "STAT 6:free_chunks_end 0\r\n",
1690            "STAT 6:get_hits 951\r\n",
1691            "STAT 6:cmd_set 100\r\n",
1692            "STAT 6:delete_hits 0\r\n",
1693            "STAT 6:incr_hits 0\r\n",
1694            "STAT 6:decr_hits 0\r\n",
1695            "STAT 6:cas_hits 0\r\n",
1696            "STAT 6:cas_badval 0\r\n",
1697            "STAT 6:touch_hits 0\r\n",
1698            "STAT 7:chunk_size 384\r\n",
1699            "STAT 7:chunks_per_page 2730\r\n",
1700            "STAT 7:total_pages 1\r\n",
1701            "STAT 7:total_chunks 2730\r\n",
1702            "STAT 7:used_chunks 5\r\n",
1703            "STAT 7:free_chunks 2725\r\n",
1704            "STAT 7:free_chunks_end 0\r\n",
1705            "STAT 7:get_hits 4792\r\n",
1706            "STAT 7:cmd_set 520\r\n",
1707            "STAT 7:delete_hits 0\r\n",
1708            "STAT 7:incr_hits 0\r\n",
1709            "STAT 7:decr_hits 0\r\n",
1710            "STAT 7:cas_hits 0\r\n",
1711            "STAT 7:cas_badval 0\r\n",
1712            "STAT 7:touch_hits 0\r\n",
1713            "STAT active_slabs 2\r\n",
1714            "STAT total_malloced 30408704\r\n",
1715            "END\r\n",
1716        );
1717        let res = client.slabs().await.unwrap();
1718
1719        let expected = vec![
1720            Slab {
1721                id: 6,
1722                chunk_size: 304,
1723                chunks_per_page: 3449,
1724                total_pages: 1,
1725                total_chunks: 3449,
1726                used_chunks: 1,
1727                free_chunks: 3448,
1728                get_hits: 951,
1729                cmd_set: 100,
1730                delete_hits: 0,
1731                incr_hits: 0,
1732                decr_hits: 0,
1733                cas_hits: 0,
1734                cas_badval: 0,
1735                touch_hits: 0,
1736            },
1737            Slab {
1738                id: 7,
1739                chunk_size: 384,
1740                chunks_per_page: 2730,
1741                total_pages: 1,
1742                total_chunks: 2730,
1743                used_chunks: 5,
1744                free_chunks: 2725,
1745                get_hits: 4792,
1746                cmd_set: 520,
1747                delete_hits: 0,
1748                incr_hits: 0,
1749                decr_hits: 0,
1750                cas_hits: 0,
1751                cas_badval: 0,
1752                touch_hits: 0,
1753            },
1754        ];
1755
1756        assert_eq!(expected, res.slabs);
1757    }
1758
1759    ///////////
1760    // items //
1761    ///////////
1762
1763    #[tokio::test]
1764    async fn test_memcached_items_empty() {
1765        let (_rx, mut client) = client!();
1766        let res = client.items().await.unwrap();
1767
1768        assert!(res.is_empty());
1769    }
1770
1771    #[tokio::test]
1772    async fn test_memcached_items_error() {
1773        let (_rx, mut client) = client!("ERROR Too many open connections\r\n");
1774        let res = client.items().await;
1775
1776        assert!(res.is_err());
1777        let err = res.unwrap_err();
1778        assert_eq!(ErrorKind::Protocol, err.kind());
1779    }
1780
1781    #[tokio::test]
1782    async fn test_memcached_items_success() {
1783        let (_rx, mut client) = client!(
1784            "STAT items:39:number 3\r\n",
1785            "STAT items:39:number_hot 0\r\n",
1786            "STAT items:39:number_warm 1\r\n",
1787            "STAT items:39:number_cold 2\r\n",
1788            "STAT items:39:age_hot 0\r\n",
1789            "STAT items:39:age_warm 7\r\n",
1790            "STAT items:39:age 8\r\n",
1791            "STAT items:39:mem_requested 1535788\r\n",
1792            "STAT items:39:evicted 1646\r\n",
1793            "STAT items:39:evicted_nonzero 1646\r\n",
1794            "STAT items:39:evicted_time 0\r\n",
1795            "STAT items:39:outofmemory 9\r\n",
1796            "STAT items:39:tailrepairs 0\r\n",
1797            "STAT items:39:reclaimed 13\r\n",
1798            "STAT items:39:expired_unfetched 4\r\n",
1799            "STAT items:39:evicted_unfetched 202\r\n",
1800            "STAT items:39:evicted_active 6\r\n",
1801            "STAT items:39:crawler_reclaimed 0\r\n",
1802            "STAT items:39:crawler_items_checked 40\r\n",
1803            "STAT items:39:lrutail_reflocked 17365\r\n",
1804            "STAT items:39:moves_to_cold 8703\r\n",
1805            "STAT items:39:moves_to_warm 7285\r\n",
1806            "STAT items:39:moves_within_lru 3651\r\n",
1807            "STAT items:39:direct_reclaims 1949\r\n",
1808            "STAT items:39:hits_to_hot 894\r\n",
1809            "STAT items:39:hits_to_warm 4079\r\n",
1810            "STAT items:39:hits_to_cold 8043\r\n",
1811            "STAT items:39:hits_to_temp 0\r\n",
1812            "END\r\n",
1813        );
1814        let res = client.items().await.unwrap();
1815
1816        let expected = SlabItems {
1817            items: vec![SlabItem {
1818                id: 39,
1819                number: 3,
1820                number_hot: 0,
1821                number_warm: 1,
1822                number_cold: 2,
1823                age_hot: 0,
1824                age_warm: 7,
1825                age: 8,
1826                mem_requested: 1535788,
1827                evicted: 1646,
1828                evicted_nonzero: 1646,
1829                evicted_time: 0,
1830                out_of_memory: 9,
1831                tail_repairs: 0,
1832                reclaimed: 13,
1833                expired_unfetched: 4,
1834                evicted_unfetched: 202,
1835                evicted_active: 6,
1836                crawler_reclaimed: 0,
1837                crawler_items_checked: 40,
1838                lrutail_reflocked: 17365,
1839                moves_to_cold: 8703,
1840                moves_to_warm: 7285,
1841                moves_within_lru: 3651,
1842                direct_reclaims: 1949,
1843                hits_to_hot: 894,
1844                hits_to_warm: 4079,
1845                hits_to_cold: 8043,
1846                hits_to_temp: 0,
1847            }],
1848        };
1849
1850        assert_eq!(expected, res);
1851    }
1852
1853    //////////
1854    // meta //
1855    //////////
1856
1857    #[tokio::test]
1858    async fn test_memcached_metas_empty() {
1859        let (_rx, mut client) = client!();
1860        let res = client.metas().await.unwrap();
1861
1862        assert!(res.is_empty());
1863    }
1864
1865    #[tokio::test]
1866    async fn test_memcached_metas_error() {
1867        let (_rx, mut client) = client!("BUSY crawler is busy\r\n",);
1868        let res = client.metas().await;
1869
1870        assert!(res.is_err());
1871        let err = res.unwrap_err();
1872        assert_eq!(ErrorKind::Protocol, err.kind());
1873    }
1874
1875    #[tokio::test]
1876    async fn test_memcached_metas_success() {
1877        let (_rx, mut client) = client!(
1878            "key=memcached%2Fmurmur3_hash.c exp=1687216956 la=1687216656 cas=259502 fetch=yes cls=17 size=2912\r\n",
1879            "key=memcached%2Fmd5.h exp=1687216956 la=1687216656 cas=259731 fetch=yes cls=17 size=3593\r\n",
1880            "END\r\n",
1881        );
1882        let res = client.metas().await.unwrap();
1883
1884        let expected = vec![
1885            Meta {
1886                key: "memcached/murmur3_hash.c".to_string(),
1887                expires: 1687216956,
1888                size: 2912,
1889            },
1890            Meta {
1891                key: "memcached/md5.h".to_string(),
1892                expires: 1687216956,
1893                size: 3593,
1894            },
1895        ];
1896
1897        assert_eq!(expected, res);
1898    }
1899}