1use std::borrow::Borrow;
2use std::cmp::Ordering;
3use std::collections::{BTreeSet, HashMap};
4use std::error;
5use std::fmt;
6use std::io;
7use std::ops::Deref;
8use std::str::FromStr;
9use std::time::Duration;
10use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter, Lines};
11
12#[derive(Debug, Default, PartialEq, Clone)]
13pub struct Stats {
14 pub pid: i64,
16 pub uptime: u64,
17 pub server_time: i64,
18 pub threads: u64,
19 pub version: String,
20
21 pub rusage_user: f64,
23 pub rusage_system: f64,
24
25 pub max_connections: u64,
27 pub curr_connections: u64,
28 pub total_connections: u64,
29 pub rejected_connections: u64,
30
31 pub cmd_get: u64,
33 pub cmd_set: u64,
34 pub cmd_flush: u64,
35 pub cmd_touch: u64,
36 pub cmd_meta: u64,
37
38 pub get_hits: u64,
40 pub get_misses: u64,
41 pub get_expired: u64,
42 pub get_flushed: u64,
43
44 pub store_too_large: u64,
46 pub store_no_memory: u64,
47
48 pub delete_hits: u64,
50 pub delete_misses: u64,
51
52 pub incr_hits: u64,
54 pub incr_misses: u64,
55 pub decr_hits: u64,
56 pub decr_misses: u64,
57
58 pub touch_hits: u64,
60 pub touch_misses: u64,
61
62 pub bytes_read: u64,
64 pub bytes_written: u64,
65 pub bytes: u64,
66 pub max_bytes: u64,
67
68 pub curr_items: u64,
70 pub total_items: u64,
71 pub evictions: u64,
72}
73
74impl TryFrom<&HashMap<String, String>> for Stats {
75 type Error = MtopError;
76
77 fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
78 Ok(Stats {
79 pid: parse_field("pid", value)?,
80 uptime: parse_field("uptime", value)?,
81 server_time: parse_field("time", value)?,
82 version: parse_field("version", value)?,
83 threads: parse_field("threads", value)?,
84
85 rusage_user: parse_field("rusage_user", value)?,
86 rusage_system: parse_field("rusage_system", value)?,
87
88 max_connections: parse_field("max_connections", value)?,
89 curr_connections: parse_field("curr_connections", value)?,
90 total_connections: parse_field("total_connections", value)?,
91 rejected_connections: parse_field("rejected_connections", value)?,
92
93 cmd_get: parse_field("cmd_get", value)?,
94 cmd_set: parse_field("cmd_set", value)?,
95 cmd_flush: parse_field("cmd_flush", value)?,
96 cmd_touch: parse_field("cmd_touch", value)?,
97 cmd_meta: parse_field("cmd_meta", value)?,
98
99 get_hits: parse_field("get_hits", value)?,
100 get_misses: parse_field("get_misses", value)?,
101 get_expired: parse_field("get_expired", value)?,
102 get_flushed: parse_field("get_flushed", value)?,
103
104 store_too_large: parse_field("store_too_large", value)?,
105 store_no_memory: parse_field("store_no_memory", value)?,
106
107 delete_hits: parse_field("delete_hits", value)?,
108 delete_misses: parse_field("delete_misses", value)?,
109
110 incr_hits: parse_field("incr_hits", value)?,
111 incr_misses: parse_field("incr_misses", value)?,
112
113 decr_hits: parse_field("decr_hits", value)?,
114 decr_misses: parse_field("decr_misses", value)?,
115
116 touch_hits: parse_field("touch_hits", value)?,
117 touch_misses: parse_field("touch_misses", value)?,
118
119 bytes_read: parse_field("bytes_read", value)?,
120 bytes_written: parse_field("bytes_written", value)?,
121 bytes: parse_field("bytes", value)?,
122 max_bytes: parse_field("limit_maxbytes", value)?,
123
124 curr_items: parse_field("curr_items", value)?,
125 total_items: parse_field("total_items", value)?,
126 evictions: parse_field("evictions", value)?,
127 })
128 }
129}
130
131#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
132pub struct Slab {
133 pub id: u64,
134 pub chunk_size: u64,
135 pub chunks_per_page: u64,
136 pub total_pages: u64,
137 pub total_chunks: u64,
138 pub used_chunks: u64,
139 pub free_chunks: u64,
140 pub get_hits: u64,
141 pub cmd_set: u64,
142 pub delete_hits: u64,
143 pub incr_hits: u64,
144 pub decr_hits: u64,
145 pub cas_hits: u64,
146 pub cas_badval: u64,
147 pub touch_hits: u64,
148}
149
150impl PartialOrd for Slab {
151 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
152 Some(self.cmp(other))
153 }
154}
155
156impl Ord for Slab {
157 fn cmp(&self, other: &Self) -> Ordering {
158 self.id.cmp(&other.id)
159 }
160}
161
162#[derive(Debug, Default, PartialEq, Eq, Clone)]
163#[repr(transparent)]
164pub struct Slabs {
165 slabs: Vec<Slab>,
166}
167
168impl Slabs {
169 pub fn iter(&self) -> impl ExactSizeIterator<Item = &Slab> {
170 self.slabs.iter()
171 }
172
173 pub fn len(&self) -> usize {
174 self.slabs.len()
175 }
176
177 pub fn is_empty(&self) -> bool {
178 self.slabs.is_empty()
179 }
180
181 pub fn find_for_size(&self, size: u64) -> Option<&Slab> {
182 self.slabs
186 .get(self.slabs.partition_point(|s| s.chunk_size < size))
187 .or_else(|| self.slabs.last())
188 }
189}
190
191impl IntoIterator for Slabs {
192 type Item = Slab;
193 type IntoIter = std::vec::IntoIter<Slab>;
194
195 fn into_iter(self) -> Self::IntoIter {
196 self.slabs.into_iter()
197 }
198}
199
200impl TryFrom<&HashMap<String, String>> for Slabs {
201 type Error = MtopError;
202
203 fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
204 let mut ids = BTreeSet::new();
209 for k in value.keys() {
210 let key_id: Option<u64> = k.split_once(':').map(|(raw, _rest)| raw).and_then(|raw| raw.parse().ok());
211
212 if let Some(id) = key_id {
213 ids.insert(id);
214 }
215 }
216
217 let mut slabs = Vec::with_capacity(ids.len());
218
219 for id in ids {
220 slabs.push(Slab {
221 id,
222 chunk_size: parse_field(&format!("{}:chunk_size", id), value)?,
223 chunks_per_page: parse_field(&format!("{}:chunks_per_page", id), value)?,
224 total_pages: parse_field(&format!("{}:total_pages", id), value)?,
225 total_chunks: parse_field(&format!("{}:total_chunks", id), value)?,
226 used_chunks: parse_field(&format!("{}:used_chunks", id), value)?,
227 free_chunks: parse_field(&format!("{}:free_chunks", id), value)?,
228 get_hits: parse_field(&format!("{}:get_hits", id), value)?,
229 cmd_set: parse_field(&format!("{}:cmd_set", id), value)?,
230 delete_hits: parse_field(&format!("{}:delete_hits", id), value)?,
231 incr_hits: parse_field(&format!("{}:incr_hits", id), value)?,
232 decr_hits: parse_field(&format!("{}:decr_hits", id), value)?,
233 cas_hits: parse_field(&format!("{}:cas_hits", id), value)?,
234 cas_badval: parse_field(&format!("{}:cas_badval", id), value)?,
235 touch_hits: parse_field(&format!("{}:touch_hits", id), value)?,
236 })
237 }
238
239 Ok(Self { slabs })
240 }
241}
242
243#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
244pub struct SlabItem {
245 pub id: u64,
246 pub number: u64,
247 pub number_hot: u64,
248 pub number_warm: u64,
249 pub number_cold: u64,
250 pub age_hot: u64,
251 pub age_warm: u64,
252 pub age: u64,
253 pub mem_requested: u64,
254 pub evicted: u64,
255 pub evicted_nonzero: u64,
256 pub evicted_time: u64,
257 pub out_of_memory: u64,
258 pub tail_repairs: u64,
259 pub reclaimed: u64,
260 pub expired_unfetched: u64,
261 pub evicted_unfetched: u64,
262 pub evicted_active: u64,
263 pub crawler_reclaimed: u64,
264 pub crawler_items_checked: u64,
265 pub lrutail_reflocked: u64,
266 pub moves_to_cold: u64,
267 pub moves_to_warm: u64,
268 pub moves_within_lru: u64,
269 pub direct_reclaims: u64,
270 pub hits_to_hot: u64,
271 pub hits_to_warm: u64,
272 pub hits_to_cold: u64,
273 pub hits_to_temp: u64,
274}
275
276impl PartialOrd for SlabItem {
277 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
278 Some(self.cmp(other))
279 }
280}
281
282impl Ord for SlabItem {
283 fn cmp(&self, other: &Self) -> Ordering {
284 self.id.cmp(&other.id)
285 }
286}
287
288#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
289#[repr(transparent)]
290pub struct SlabItems {
291 items: Vec<SlabItem>,
292}
293
294impl SlabItems {
295 pub fn iter(&self) -> impl ExactSizeIterator<Item = &SlabItem> {
296 self.items.iter()
297 }
298
299 pub fn len(&self) -> usize {
300 self.items.len()
301 }
302
303 pub fn is_empty(&self) -> bool {
304 self.items.is_empty()
305 }
306}
307
308impl IntoIterator for SlabItems {
309 type Item = SlabItem;
310 type IntoIter = std::vec::IntoIter<SlabItem>;
311
312 fn into_iter(self) -> Self::IntoIter {
313 self.items.into_iter()
314 }
315}
316
317impl TryFrom<&HashMap<String, String>> for SlabItems {
318 type Error = MtopError;
319
320 fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
321 let mut ids = BTreeSet::new();
326 for k in value.keys() {
327 let key_id: Option<u64> = k
328 .trim_start_matches("items:")
329 .split_once(':')
330 .map(|(raw, _rest)| raw)
331 .and_then(|raw| raw.parse().ok());
332
333 if let Some(id) = key_id {
334 ids.insert(id);
335 }
336 }
337
338 let mut items = Vec::with_capacity(ids.len());
339
340 for id in ids {
341 items.push(SlabItem {
342 id,
343 number: parse_field(&format!("items:{}:number", id), value)?,
344 number_hot: parse_field(&format!("items:{}:number_hot", id), value)?,
345 number_warm: parse_field(&format!("items:{}:number_warm", id), value)?,
346 number_cold: parse_field(&format!("items:{}:number_cold", id), value)?,
347 age_hot: parse_field(&format!("items:{}:age_hot", id), value)?,
348 age_warm: parse_field(&format!("items:{}:age_warm", id), value)?,
349 age: parse_field(&format!("items:{}:age", id), value)?,
350 mem_requested: parse_field(&format!("items:{}:mem_requested", id), value)?,
351 evicted: parse_field(&format!("items:{}:evicted", id), value)?,
352 evicted_nonzero: parse_field(&format!("items:{}:evicted_nonzero", id), value)?,
353 evicted_time: parse_field(&format!("items:{}:evicted_time", id), value)?,
354 out_of_memory: parse_field(&format!("items:{}:outofmemory", id), value)?,
355 tail_repairs: parse_field(&format!("items:{}:tailrepairs", id), value)?,
356 reclaimed: parse_field(&format!("items:{}:reclaimed", id), value)?,
357 expired_unfetched: parse_field(&format!("items:{}:expired_unfetched", id), value)?,
358 evicted_unfetched: parse_field(&format!("items:{}:evicted_unfetched", id), value)?,
359 evicted_active: parse_field(&format!("items:{}:evicted_active", id), value)?,
360 crawler_reclaimed: parse_field(&format!("items:{}:crawler_reclaimed", id), value)?,
361 crawler_items_checked: parse_field(&format!("items:{}:crawler_items_checked", id), value)?,
362 lrutail_reflocked: parse_field(&format!("items:{}:lrutail_reflocked", id), value)?,
363 moves_to_cold: parse_field(&format!("items:{}:moves_to_cold", id), value)?,
364 moves_to_warm: parse_field(&format!("items:{}:moves_to_warm", id), value)?,
365 moves_within_lru: parse_field(&format!("items:{}:moves_within_lru", id), value)?,
366 direct_reclaims: parse_field(&format!("items:{}:direct_reclaims", id), value)?,
367 hits_to_hot: parse_field(&format!("items:{}:hits_to_hot", id), value)?,
368 hits_to_warm: parse_field(&format!("items:{}:hits_to_warm", id), value)?,
369 hits_to_cold: parse_field(&format!("items:{}:hits_to_cold", id), value)?,
370 hits_to_temp: parse_field(&format!("items:{}:hits_to_temp", id), value)?,
371 })
372 }
373
374 Ok(Self { items })
375 }
376}
377
378#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
379pub struct Meta {
380 pub key: String,
381 pub expires: i64,
382 pub size: u64,
383}
384
385impl Meta {
386 const KEYS: &'static [&'static str] = &["key", "exp", "size"];
390}
391
392impl TryFrom<&HashMap<String, String>> for Meta {
393 type Error = MtopError;
394
395 fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
396 Ok(Meta {
397 key: parse_field("key", value)?,
398 expires: parse_field("exp", value)?,
399 size: parse_field("size", value)?,
400 })
401 }
402}
403
404impl PartialOrd for Meta {
405 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
406 Some(self.cmp(other))
407 }
408}
409
410impl Ord for Meta {
411 fn cmp(&self, other: &Self) -> Ordering {
412 self.key.cmp(&other.key)
413 }
414}
415
416#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
417pub struct Value {
418 pub key: String,
419 pub cas: u64,
420 pub flags: u64,
421 pub data: Vec<u8>,
422}
423
424impl PartialOrd for Value {
425 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
426 Some(self.cmp(other))
427 }
428}
429
430impl Ord for Value {
431 fn cmp(&self, other: &Self) -> Ordering {
432 self.key.cmp(&other.key)
433 }
434}
435
436fn parse_field<T>(key: &str, map: &HashMap<String, String>) -> Result<T, MtopError>
437where
438 T: FromStr,
439 <T as FromStr>::Err: fmt::Display + Send + Sync + error::Error + 'static,
440{
441 map.get(key)
442 .ok_or_else(|| MtopError::runtime(format!("field {} missing", key)))
443 .and_then(|v| {
444 v.parse()
445 .map_err(|e| MtopError::runtime_cause(format!("field {} value '{}'", key, v), e))
446 })
447}
448
449fn parse_value<T>(val: &str, line: &str) -> Result<T, MtopError>
450where
451 T: FromStr + fmt::Display,
452 <T as FromStr>::Err: fmt::Display + Send + Sync + error::Error + 'static,
453{
454 val.parse()
455 .map_err(|e| MtopError::runtime_cause(format!("parsing {} from '{}'", val, line), e))
456}
457
458#[derive(Debug, PartialOrd, PartialEq, Copy, Clone)]
459pub enum ErrorKind {
460 Runtime,
461 IO,
462 Protocol,
463 Configuration,
464}
465
466impl fmt::Display for ErrorKind {
467 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
468 match self {
469 Self::Runtime => write!(f, "runtime error"),
470 Self::IO => write!(f, "io error"),
471 Self::Protocol => write!(f, "protocol error"),
472 Self::Configuration => write!(f, "configuration error"),
473 }
474 }
475}
476
477#[derive(Debug)]
478enum ErrorRepr {
479 Message(String),
480 Cause(Box<dyn error::Error + Send + Sync + 'static>),
481 MessageCause(String, Box<dyn error::Error + Send + Sync + 'static>),
482}
483
484#[derive(Debug)]
485pub struct MtopError {
486 kind: ErrorKind,
487 repr: ErrorRepr,
488}
489
490impl MtopError {
491 pub fn runtime<S>(msg: S) -> MtopError
492 where
493 S: Into<String>,
494 {
495 MtopError {
496 kind: ErrorKind::Runtime,
497 repr: ErrorRepr::Message(msg.into()),
498 }
499 }
500
501 pub fn runtime_cause<S, E>(msg: S, e: E) -> MtopError
502 where
503 S: Into<String>,
504 E: error::Error + Send + Sync + 'static,
505 {
506 MtopError {
507 kind: ErrorKind::Runtime,
508 repr: ErrorRepr::MessageCause(msg.into(), Box::new(e)),
509 }
510 }
511
512 pub fn configuration<S>(msg: S) -> MtopError
513 where
514 S: Into<String>,
515 {
516 MtopError {
517 kind: ErrorKind::Configuration,
518 repr: ErrorRepr::Message(msg.into()),
519 }
520 }
521
522 pub fn configuration_cause<S, E>(msg: S, e: E) -> MtopError
523 where
524 S: Into<String>,
525 E: error::Error + Send + Sync + 'static,
526 {
527 MtopError {
528 kind: ErrorKind::Configuration,
529 repr: ErrorRepr::MessageCause(msg.into(), Box::new(e)),
530 }
531 }
532
533 pub fn timeout<D>(t: Duration, operation: D) -> MtopError
534 where
535 D: fmt::Display,
536 {
537 MtopError {
538 kind: ErrorKind::IO,
539 repr: ErrorRepr::Message(format!("operation {} timed out after {:?}", operation, t)),
540 }
541 }
542
543 pub fn kind(&self) -> ErrorKind {
544 self.kind
545 }
546}
547
548impl fmt::Display for MtopError {
549 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
550 match &self.repr {
551 ErrorRepr::Message(msg) => write!(f, "{}: {}", self.kind, msg),
552 ErrorRepr::Cause(e) => write!(f, "{}: {}", self.kind, e),
553 ErrorRepr::MessageCause(msg, e) => write!(f, "{}: {}: {}", self.kind, msg, e),
554 }
555 }
556}
557
558impl error::Error for MtopError {
559 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
560 match &self.repr {
561 ErrorRepr::Message(_) => None,
562 ErrorRepr::Cause(e) => Some(e.as_ref()),
563 ErrorRepr::MessageCause(_, e) => Some(e.as_ref()),
564 }
565 }
566}
567
568impl From<(String, io::Error)> for MtopError {
569 fn from((s, e): (String, io::Error)) -> Self {
570 MtopError {
571 kind: ErrorKind::IO,
572 repr: ErrorRepr::MessageCause(s, Box::new(e)),
573 }
574 }
575}
576
577impl From<io::Error> for MtopError {
578 fn from(e: io::Error) -> Self {
579 MtopError {
580 kind: ErrorKind::IO,
581 repr: ErrorRepr::Cause(Box::new(e)),
582 }
583 }
584}
585
586impl From<ProtocolError> for MtopError {
587 fn from(e: ProtocolError) -> Self {
588 MtopError {
589 kind: ErrorKind::Protocol,
590 repr: ErrorRepr::Cause(Box::new(e)),
591 }
592 }
593}
594
595#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
596pub enum ProtocolErrorKind {
597 BadClass,
598 Busy,
599 Client,
600 NotFound,
601 NotStored,
602 Server,
603 Syntax,
604}
605
606impl fmt::Display for ProtocolErrorKind {
607 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
608 match self {
609 Self::BadClass => "BADCLASS".fmt(f),
610 Self::Busy => "BUSY".fmt(f),
611 Self::Client => "CLIENT_ERROR".fmt(f),
612 Self::NotFound => "NOT_FOUND".fmt(f),
613 Self::NotStored => "NOT_STORED".fmt(f),
614 Self::Server => "SERVER_ERROR".fmt(f),
615 Self::Syntax => "ERROR".fmt(f),
616 }
617 }
618}
619
620#[derive(Debug)]
621pub struct ProtocolError {
622 kind: ProtocolErrorKind,
623 message: Option<String>,
624}
625
626impl ProtocolError {
627 pub fn kind(&self) -> ProtocolErrorKind {
628 self.kind
629 }
630}
631
632impl fmt::Display for ProtocolError {
633 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
634 if let Some(msg) = &self.message {
635 write!(f, "{} {}", self.kind, msg)
636 } else {
637 write!(f, "{}", self.kind)
638 }
639 }
640}
641
642impl error::Error for ProtocolError {}
643
644#[derive(Debug, Eq, PartialEq, Clone)]
645enum Command<'a> {
646 Add(&'a Key, u64, u32, &'a [u8]),
647 CrawlerMetadump,
648 Decr(&'a Key, u64),
649 Delete(&'a Key),
650 FlushAll,
651 FlushAllWait(u64),
652 Gets(&'a [Key]),
653 Incr(&'a Key, u64),
654 Replace(&'a Key, u64, u32, &'a [u8]),
655 Stats,
656 StatsItems,
657 StatsSlabs,
658 Set(&'a Key, u64, u32, &'a [u8]),
659 Touch(&'a Key, u32),
660 Version,
661}
662
663impl<'a> From<Command<'a>> for Vec<u8> {
664 fn from(value: Command<'a>) -> Self {
665 match value {
666 Command::Add(key, flags, ttl, data) => storage_command("add", key, flags, ttl, data),
667 Command::CrawlerMetadump => "lru_crawler metadump hash\r\n".to_owned().into_bytes(),
668 Command::Decr(key, delta) => format!("decr {} {}\r\n", key, delta).into_bytes(),
669 Command::Delete(key) => format!("delete {}\r\n", key).into_bytes(),
670 Command::FlushAll => "flush_all\r\n".to_owned().into_bytes(),
671 Command::FlushAllWait(wait) => format!("flush_all {}\r\n", wait).into_bytes(),
672 Command::Gets(keys) => format!("gets {}\r\n", keys.join(" ")).into_bytes(),
673 Command::Incr(key, delta) => format!("incr {} {}\r\n", key, delta).into_bytes(),
674 Command::Replace(key, flags, ttl, data) => storage_command("replace", key, flags, ttl, data),
675 Command::Stats => "stats\r\n".to_owned().into_bytes(),
676 Command::StatsItems => "stats items\r\n".to_owned().into_bytes(),
677 Command::StatsSlabs => "stats slabs\r\n".to_owned().into_bytes(),
678 Command::Set(key, flags, ttl, data) => storage_command("set", key, flags, ttl, data),
679 Command::Touch(key, ttl) => format!("touch {} {}\r\n", key, ttl).into_bytes(),
680 Command::Version => "version\r\n".to_owned().into_bytes(),
681 }
682 }
683}
684
685fn storage_command(verb: &str, key: &Key, flags: u64, ttl: u32, data: &[u8]) -> Vec<u8> {
686 let mut bytes = Vec::with_capacity(key.len() + data.len() + 32);
687 io::Write::write_all(
688 &mut bytes,
689 format!("{} {} {} {} {}\r\n", verb, key, flags, ttl, data.len()).as_bytes(),
690 )
691 .unwrap();
692 io::Write::write_all(&mut bytes, data).unwrap();
693 io::Write::write_all(&mut bytes, "\r\n".as_bytes()).unwrap();
694 bytes
695}
696
697pub struct Memcached {
698 read: Lines<BufReader<Box<dyn AsyncRead + Send + Sync + Unpin>>>,
699 write: BufWriter<Box<dyn AsyncWrite + Send + Sync + Unpin>>,
700}
701
702impl Memcached {
703 const MAX_PAYLOAD_SIZE: u64 = 1024 * 1024 * 1024;
704
705 pub fn new<R, W>(read: R, write: W) -> Self
706 where
707 R: AsyncRead + Send + Sync + Unpin + 'static,
708 W: AsyncWrite + Send + Sync + Unpin + 'static,
709 {
710 Memcached {
711 read: BufReader::<Box<dyn AsyncRead + Send + Sync + Unpin>>::new(Box::new(read)).lines(),
712 write: BufWriter::new(Box::new(write)),
713 }
714 }
715
716 pub async fn stats(&mut self) -> Result<Stats, MtopError> {
718 self.send(Command::Stats).await?;
719 let raw = self.read_stats_response().await?;
720 Stats::try_from(&raw)
721 }
722
723 pub async fn slabs(&mut self) -> Result<Slabs, MtopError> {
728 self.send(Command::StatsSlabs).await?;
729 let raw = self.read_stats_response().await?;
730 Slabs::try_from(&raw)
731 }
732
733 pub async fn items(&mut self) -> Result<SlabItems, MtopError> {
738 self.send(Command::StatsItems).await?;
739 let raw = self.read_stats_response().await?;
740 SlabItems::try_from(&raw)
741 }
742
743 async fn read_stats_response(&mut self) -> Result<HashMap<String, String>, MtopError> {
744 let mut out = HashMap::new();
745
746 while let Some(v) = self.read.next_line().await? {
747 if v == "END" {
748 break;
749 }
750
751 let (key, val) = Self::parse_stat_line(&v)?;
752 out.insert(key.to_owned(), val.to_owned());
753 }
754
755 Ok(out)
756 }
757
758 fn parse_stat_line(line: &str) -> Result<(&str, &str), MtopError> {
759 let mut parts = line.splitn(3, ' ');
760 match (parts.next(), parts.next(), parts.next()) {
761 (Some("STAT"), Some(key), Some(val)) => Ok((key, val)),
762 _ => {
763 if let Some(err) = Self::parse_error(line) {
764 Err(MtopError::from(err))
765 } else {
766 Err(MtopError::runtime(format!("unable to parse '{}'", line)))
767 }
768 }
769 }
770 }
771
772 pub async fn metas(&mut self) -> Result<Vec<Meta>, MtopError> {
776 self.send(Command::CrawlerMetadump).await?;
777 let mut out = Vec::new();
778 let mut raw = HashMap::new();
779
780 while let Some(v) = self.read.next_line().await? {
781 if v == "END" {
782 break;
783 }
784
785 if let Some(err) = Self::parse_error(&v) {
789 return Err(MtopError::from(err));
790 }
791
792 let item = Self::parse_crawler_meta(&v, Meta::KEYS, &mut raw)?;
793 out.push(item);
794 }
795
796 Ok(out)
797 }
798
799 fn parse_crawler_meta(line: &str, keys: &[&str], raw: &mut HashMap<String, String>) -> Result<Meta, MtopError> {
800 raw.clear();
802
803 for p in line.split(' ') {
804 let (key, val) = p
805 .split_once('=')
806 .ok_or_else(|| MtopError::runtime(format!("unexpected metadump format '{}'", line)))?;
807
808 if !keys.contains(&key) {
812 continue;
813 }
814
815 let decoded = urlencoding::decode(val)
816 .map_err(|e| MtopError::runtime_cause(format!("unexpected metadump encoding '{}'", line), e))?;
817 raw.insert(key.to_owned(), decoded.into_owned());
818 }
819
820 Meta::try_from(raw.deref())
821 }
822
823 pub async fn ping(&mut self) -> Result<(), MtopError> {
825 self.send(Command::Version).await?;
826 if let Some(v) = self.read.next_line().await? {
827 if let Some(e) = Self::parse_error(&v) {
828 return Err(MtopError::from(e));
829 }
830 if !v.starts_with("VERSION") {
831 return Err(MtopError::runtime(format!("unable to parse '{}'", v)));
832 }
833 }
834
835 Ok(())
836 }
837
838 pub async fn flush_all(&mut self, wait: Option<Duration>) -> Result<(), MtopError> {
841 let cmd = if let Some(d) = wait {
842 Command::FlushAllWait(d.as_secs())
843 } else {
844 Command::FlushAll
845 };
846
847 self.send(cmd).await?;
848 self.read_simple_response("OK").await
849 }
850
851 pub async fn get(&mut self, keys: &[Key]) -> Result<HashMap<String, Value>, MtopError> {
854 self.send(Command::Gets(keys)).await?;
855 let mut out = HashMap::with_capacity(keys.len());
856
857 while let Some(v) = self.read.next_line().await? {
858 if v == "END" {
859 break;
860 }
861
862 let value = self.parse_gets_value(&v).await?;
863 out.insert(value.key.clone(), value);
864 }
865
866 Ok(out)
867 }
868
869 async fn parse_gets_value(&mut self, line: &str) -> Result<Value, MtopError> {
870 let mut parts = line.splitn(5, ' ');
871
872 match (parts.next(), parts.next(), parts.next(), parts.next(), parts.next()) {
873 (Some("VALUE"), Some(k), Some(flags), Some(len), Some(cas)) => {
874 let flags: u64 = parse_value(flags, line)?;
875 let len: u64 = parse_value(len, line)?;
876 let cas: u64 = parse_value(cas, line)?;
877
878 if len > Self::MAX_PAYLOAD_SIZE {
883 return Err(MtopError::runtime(format!(
884 "server response of length {} exceeds client max of {}",
885 len,
886 Self::MAX_PAYLOAD_SIZE
887 )));
888 }
889
890 let mut data = Vec::with_capacity(len as usize + 2);
892 let reader = self.read.get_mut();
893 reader.take(len + 2).read_to_end(&mut data).await?;
894 data.truncate(len as usize);
895
896 Ok(Value {
897 key: k.to_owned(),
898 flags,
899 cas,
900 data,
901 })
902 }
903 _ => {
904 if let Some(err) = Self::parse_error(line) {
908 Err(MtopError::from(err))
909 } else {
910 Err(MtopError::runtime(format!("unable to parse '{}'", line)))
911 }
912 }
913 }
914 }
915
916 pub async fn incr(&mut self, key: &Key, delta: u64) -> Result<u64, MtopError> {
919 self.send(Command::Incr(key, delta)).await?;
920 if let Some(v) = self.read.next_line().await? {
921 Self::parse_numeric_response(&v)
922 } else {
923 Err(MtopError::runtime("unexpected empty response"))
924 }
925 }
926
927 pub async fn decr(&mut self, key: &Key, delta: u64) -> Result<u64, MtopError> {
930 self.send(Command::Decr(key, delta)).await?;
931 if let Some(v) = self.read.next_line().await? {
932 Self::parse_numeric_response(&v)
933 } else {
934 Err(MtopError::runtime("unexpected empty response"))
935 }
936 }
937
938 fn parse_numeric_response(line: &str) -> Result<u64, MtopError> {
939 if let Some(err) = Self::parse_error(line) {
940 Err(MtopError::from(err))
941 } else {
942 line.parse()
943 .map_err(|_e| MtopError::runtime(format!("unable to parse '{}'", line)))
944 }
945 }
946
947 pub async fn set<V>(&mut self, key: &Key, flags: u64, ttl: u32, data: V) -> Result<(), MtopError>
949 where
950 V: AsRef<[u8]>,
951 {
952 self.send(Command::Set(key, flags, ttl, data.as_ref())).await?;
953 self.read_simple_response("STORED").await
954 }
955
956 pub async fn add<V>(&mut self, key: &Key, flags: u64, ttl: u32, data: V) -> Result<(), MtopError>
958 where
959 V: AsRef<[u8]>,
960 {
961 self.send(Command::Add(key, flags, ttl, data.as_ref())).await?;
962 self.read_simple_response("STORED").await
963 }
964
965 pub async fn replace<V>(&mut self, key: &Key, flags: u64, ttl: u32, data: V) -> Result<(), MtopError>
967 where
968 V: AsRef<[u8]>,
969 {
970 self.send(Command::Replace(key, flags, ttl, data.as_ref())).await?;
971 self.read_simple_response("STORED").await
972 }
973
974 pub async fn touch(&mut self, key: &Key, ttl: u32) -> Result<(), MtopError> {
976 self.send(Command::Touch(key, ttl)).await?;
977 self.read_simple_response("TOUCHED").await
978 }
979
980 pub async fn delete(&mut self, key: &Key) -> Result<(), MtopError> {
982 self.send(Command::Delete(key)).await?;
983 self.read_simple_response("DELETED").await
984 }
985
986 async fn read_simple_response(&mut self, expected: &str) -> Result<(), MtopError> {
987 match self.read.next_line().await? {
988 Some(line) if line == expected => Ok(()),
989 Some(line) => {
990 if let Some(err) = Self::parse_error(&line) {
991 Err(MtopError::from(err))
992 } else {
993 Err(MtopError::runtime(format!("unable to parse '{}'", line)))
994 }
995 }
996 None => Err(MtopError::runtime("unexpected empty response")),
997 }
998 }
999
1000 fn parse_error(line: &str) -> Option<ProtocolError> {
1001 let mut values = line.splitn(2, ' ');
1002 let (kind, message) = match (values.next(), values.next()) {
1003 (Some("BADCLASS"), Some(msg)) => (ProtocolErrorKind::BadClass, Some(msg.to_owned())),
1004 (Some("BUSY"), Some(msg)) => (ProtocolErrorKind::Busy, Some(msg.to_owned())),
1005 (Some("CLIENT_ERROR"), Some(msg)) => (ProtocolErrorKind::Client, Some(msg.to_owned())),
1006 (Some("ERROR"), None) => (ProtocolErrorKind::Syntax, None),
1007 (Some("ERROR"), Some(msg)) => (ProtocolErrorKind::Syntax, Some(msg.to_owned())),
1008 (Some("NOT_FOUND"), None) => (ProtocolErrorKind::NotFound, None),
1009 (Some("NOT_STORED"), None) => (ProtocolErrorKind::NotStored, None),
1010 (Some("SERVER_ERROR"), Some(msg)) => (ProtocolErrorKind::Server, Some(msg.to_owned())),
1011
1012 _ => return None,
1013 };
1014
1015 Some(ProtocolError { kind, message })
1016 }
1017
1018 async fn send(&mut self, cmd: Command<'_>) -> Result<(), MtopError> {
1019 let cmd_bytes: Vec<u8> = cmd.into();
1020 self.write.write_all(&cmd_bytes).await?;
1021 Ok(self.write.flush().await?)
1022 }
1023}
1024
1025impl fmt::Debug for Memcached {
1026 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1027 write!(f, "Memcached {{ read: <...>, write: <...> }}")
1028 }
1029}
1030
1031#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1032#[repr(transparent)]
1033pub struct Key(String);
1034
1035impl Key {
1036 const MAX_LENGTH: usize = 250;
1037
1038 pub fn one<T>(val: T) -> Result<Key, MtopError>
1039 where
1040 T: Into<String>,
1041 {
1042 let val = val.into();
1043 if !Self::is_legal_val(&val) {
1044 Err(MtopError::runtime(format!("invalid key {}", val)))
1045 } else {
1046 Ok(Key(val))
1047 }
1048 }
1049
1050 pub fn many<I, T>(vals: I) -> Result<Vec<Key>, MtopError>
1051 where
1052 I: IntoIterator<Item = T>,
1053 T: Into<String>,
1054 {
1055 let iter = vals.into_iter();
1056 let (sz, _) = iter.size_hint();
1057 let mut out = Vec::with_capacity(sz);
1058
1059 for val in iter {
1060 out.push(Self::one(val)?);
1061 }
1062
1063 Ok(out)
1064 }
1065
1066 pub fn len(&self) -> usize {
1067 self.0.len()
1068 }
1069
1070 pub fn is_empty(&self) -> bool {
1071 self.0.is_empty()
1072 }
1073
1074 fn is_legal_val(val: &str) -> bool {
1075 if val.len() > Self::MAX_LENGTH {
1076 return false;
1077 }
1078
1079 for c in val.chars() {
1080 if !c.is_ascii() || c.is_ascii_whitespace() || c.is_ascii_control() {
1081 return false;
1082 }
1083 }
1084
1085 true
1086 }
1087}
1088
1089impl fmt::Display for Key {
1090 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1091 self.0.fmt(f)
1092 }
1093}
1094
1095impl AsRef<str> for Key {
1096 fn as_ref(&self) -> &str {
1097 &self.0
1098 }
1099}
1100
1101impl Borrow<str> for Key {
1102 fn borrow(&self) -> &str {
1103 &self.0
1104 }
1105}
1106
1107#[cfg(test)]
1108mod test {
1109 use super::{ErrorKind, Key, Memcached, Meta, Slab, SlabItem, SlabItems};
1110 use std::io::{Cursor, Error};
1111 use std::pin::Pin;
1112 use std::task::{Context, Poll};
1113 use std::time::Duration;
1114 use tokio::io::AsyncWrite;
1115 use tokio::sync::mpsc::{self, UnboundedSender};
1116
1117 #[test]
1122 fn test_key_one_length() {
1123 let val = "abc".repeat(Key::MAX_LENGTH);
1124 let res = Key::one(val);
1125 assert!(res.is_err());
1126 }
1127
1128 #[test]
1129 fn test_key_one_non_ascii() {
1130 let val = "🤦";
1131 let res = Key::one(val);
1132 assert!(res.is_err());
1133 }
1134
1135 #[test]
1136 fn test_key_one_whitespace() {
1137 let val = "some thing";
1138 let res = Key::one(val);
1139 assert!(res.is_err());
1140 }
1141
1142 #[test]
1143 fn test_key_one_control_char() {
1144 let val = "\x7F";
1145 let res = Key::one(val);
1146 assert!(res.is_err());
1147 }
1148
1149 #[test]
1150 fn test_key_one_success() {
1151 let val = "a-reasonable-key";
1152 let res = Key::one(val);
1153 assert!(res.is_ok());
1154 }
1155
1156 struct WriteAdapter {
1157 tx: UnboundedSender<Vec<u8>>,
1158 }
1159
1160 impl AsyncWrite for WriteAdapter {
1161 fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, Error>> {
1162 self.tx.send(buf.to_owned()).unwrap();
1163 Poll::Ready(Ok(buf.len()))
1164 }
1165
1166 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1167 Poll::Ready(Ok(()))
1168 }
1169
1170 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1171 Poll::Ready(Ok(()))
1172 }
1173 }
1174
1175 macro_rules! client {
1180 () => ({
1181 let (tx, rx) = mpsc::unbounded_channel();
1182 let reads = Vec::new();
1183 (rx, Memcached::new(Cursor::new(reads), WriteAdapter { tx }))
1184 });
1185 ($($line:expr),+ $(,)?) => ({
1186 let (tx, rx) = mpsc::unbounded_channel();
1187 let mut reads = Vec::new();
1188 $(reads.extend_from_slice($line.as_bytes());)+
1189 (rx, Memcached::new(Cursor::new(reads), WriteAdapter { tx }))
1190 })
1191 }
1192
1193 #[tokio::test]
1198 async fn test_memcached_get_no_key() {
1199 let (_rx, mut client) = client!();
1200 let vals: Vec<String> = vec![];
1201 let keys = Key::many(vals).unwrap();
1202 let res = client.get(&keys).await.unwrap();
1203
1204 assert!(res.is_empty());
1205 }
1206
1207 #[tokio::test]
1208 async fn test_memcached_get_error() {
1209 let (_rx, mut client) = client!("SERVER_ERROR backend failure\r\n");
1210 let keys = Key::many(vec!["foo", "baz"]).unwrap();
1211 let res = client.get(&keys).await;
1212
1213 assert!(res.is_err());
1214 let err = res.unwrap_err();
1215 assert_eq!(ErrorKind::Protocol, err.kind());
1216 }
1217
1218 #[tokio::test]
1219 async fn test_memcached_get_miss() {
1220 let (_rx, mut client) = client!("END\r\n");
1221 let keys = Key::many(vec!["foo", "baz"]).unwrap();
1222 let res = client.get(&keys).await.unwrap();
1223
1224 assert!(res.is_empty());
1225 }
1226
1227 #[tokio::test]
1228 async fn test_memcached_get_hit() {
1229 let (_rx, mut client) = client!(
1230 "VALUE foo 32 3 1\r\n",
1231 "bar\r\n",
1232 "VALUE baz 64 3 2\r\n",
1233 "qux\r\n",
1234 "END\r\n",
1235 );
1236 let keys = Key::many(vec!["foo", "baz"]).unwrap();
1237 let res = client.get(&keys).await.unwrap();
1238
1239 let val1 = res.get("foo").unwrap();
1240 assert_eq!("foo", val1.key);
1241 assert_eq!("bar".as_bytes(), val1.data);
1242 assert_eq!(32, val1.flags);
1243 assert_eq!(1, val1.cas);
1244
1245 let val2 = res.get("baz").unwrap();
1246 assert_eq!("baz", val2.key);
1247 assert_eq!("qux".as_bytes(), val2.data);
1248 assert_eq!(64, val2.flags);
1249 assert_eq!(2, val2.cas);
1250 }
1251
1252 #[tokio::test]
1257 async fn test_memcached_incr_bad_val() {
1258 let (mut rx, mut client) = client!("CLIENT_ERROR cannot increment or decrement non-numeric value\r\n");
1259 let key = Key::one("test").unwrap();
1260 let res = client.incr(&key, 2).await;
1261
1262 assert!(res.is_err());
1263 let err = res.unwrap_err();
1264 assert_eq!(ErrorKind::Protocol, err.kind());
1265
1266 let bytes = rx.recv().await.unwrap();
1267 let command = String::from_utf8(bytes).unwrap();
1268 assert_eq!("incr test 2\r\n", command);
1269 }
1270
1271 #[tokio::test]
1272 async fn test_memcached_incr_success() {
1273 let (mut rx, mut client) = client!("3\r\n");
1274 let key = Key::one("test").unwrap();
1275 let res = client.incr(&key, 2).await.unwrap();
1276
1277 assert_eq!(3, res);
1278 let bytes = rx.recv().await.unwrap();
1279 let command = String::from_utf8(bytes).unwrap();
1280 assert_eq!("incr test 2\r\n", command);
1281 }
1282
1283 #[tokio::test]
1288 async fn test_memcached_decr_bad_val() {
1289 let (mut rx, mut client) = client!("CLIENT_ERROR cannot increment or decrement non-numeric value\r\n");
1290 let key = Key::one("test").unwrap();
1291 let res = client.decr(&key, 1).await;
1292
1293 assert!(res.is_err());
1294 let err = res.unwrap_err();
1295 assert_eq!(ErrorKind::Protocol, err.kind());
1296
1297 let bytes = rx.recv().await.unwrap();
1298 let command = String::from_utf8(bytes).unwrap();
1299 assert_eq!("decr test 1\r\n", command);
1300 }
1301
1302 #[tokio::test]
1303 async fn test_memcached_decr_success() {
1304 let (mut rx, mut client) = client!("3\r\n");
1305 let key = Key::one("test").unwrap();
1306 let res = client.decr(&key, 1).await.unwrap();
1307
1308 assert_eq!(3, res);
1309 let bytes = rx.recv().await.unwrap();
1310 let command = String::from_utf8(bytes).unwrap();
1311 assert_eq!("decr test 1\r\n", command);
1312 }
1313
1314 #[tokio::test]
1319 async fn test_memcached_ping_bad_val() {
1320 let (mut rx, mut client) = client!("220 localhost ESMTP Postfix\r\n");
1321 let res = client.ping().await;
1322
1323 assert!(res.is_err());
1324 let err = res.unwrap_err();
1325 assert_eq!(ErrorKind::Runtime, err.kind());
1326
1327 let bytes = rx.recv().await.unwrap();
1328 let command = String::from_utf8(bytes).unwrap();
1329 assert_eq!("version\r\n", command);
1330 }
1331
1332 #[tokio::test]
1333 async fn test_memcached_ping_success() {
1334 let (mut rx, mut client) = client!("VERSION 1.6.22\r\n");
1335 client.ping().await.unwrap();
1336
1337 let bytes = rx.recv().await.unwrap();
1338 let command = String::from_utf8(bytes).unwrap();
1339 assert_eq!("version\r\n", command);
1340 }
1341
1342 macro_rules! test_store_command_success {
1343 ($method:ident, $verb:expr) => {
1344 let (mut rx, mut client) = client!("STORED\r\n");
1345 let res = client.$method(&Key::one("test").unwrap(), 0, 300, "val".as_bytes()).await;
1346
1347 assert!(res.is_ok());
1348 let bytes = rx.recv().await.unwrap();
1349 let command = String::from_utf8(bytes).unwrap();
1350 assert_eq!(concat!($verb, " test 0 300 3\r\nval\r\n"), command);
1351 };
1352 }
1353
1354 macro_rules! test_store_command_error {
1355 ($method:ident, $verb:expr) => {
1356 let (mut rx, mut client) = client!("NOT_STORED\r\n");
1357 let res = client.$method(&Key::one("test").unwrap(), 0, 300, "val".as_bytes()).await;
1358
1359 assert!(res.is_err());
1360 let err = res.unwrap_err();
1361 assert_eq!(ErrorKind::Protocol, err.kind());
1362
1363 let bytes = rx.recv().await.unwrap();
1364 let command = String::from_utf8(bytes).unwrap();
1365 assert_eq!(concat!($verb, " test 0 300 3\r\nval\r\n"), command);
1366 };
1367 }
1368
1369 #[tokio::test]
1374 async fn test_memcached_set_success() {
1375 test_store_command_success!(set, "set");
1376 }
1377
1378 #[tokio::test]
1379 async fn test_memcached_set_error() {
1380 test_store_command_error!(set, "set");
1381 }
1382
1383 #[tokio::test]
1388 async fn test_memcached_add_success() {
1389 test_store_command_success!(add, "add");
1390 }
1391
1392 #[tokio::test]
1393 async fn test_memcached_add_error() {
1394 test_store_command_error!(add, "add");
1395 }
1396
1397 #[tokio::test]
1402 async fn test_memcached_replace_success() {
1403 test_store_command_success!(replace, "replace");
1404 }
1405
1406 #[tokio::test]
1407 async fn test_memcached_replace_error() {
1408 test_store_command_error!(replace, "replace");
1409 }
1410
1411 #[tokio::test]
1416 async fn test_memcached_touch_success() {
1417 let (mut rx, mut client) = client!("TOUCHED\r\n");
1418 let key = Key::one("test").unwrap();
1419 let res = client.touch(&key, 300).await;
1420
1421 assert!(res.is_ok());
1422 let bytes = rx.recv().await.unwrap();
1423 let command = String::from_utf8(bytes).unwrap();
1424 assert_eq!("touch test 300\r\n", command);
1425 }
1426
1427 #[tokio::test]
1428 async fn test_memcached_touch_error() {
1429 let (mut rx, mut client) = client!("NOT_FOUND\r\n");
1430 let key = Key::one("test").unwrap();
1431 let res = client.touch(&key, 300).await;
1432
1433 assert!(res.is_err());
1434 let err = res.unwrap_err();
1435 assert_eq!(ErrorKind::Protocol, err.kind());
1436
1437 let bytes = rx.recv().await.unwrap();
1438 let command = String::from_utf8(bytes).unwrap();
1439 assert_eq!("touch test 300\r\n", command);
1440 }
1441
1442 #[tokio::test]
1447 async fn test_memcached_delete_success() {
1448 let (mut rx, mut client) = client!("DELETED\r\n");
1449 let key = Key::one("test").unwrap();
1450 let res = client.delete(&key).await;
1451
1452 assert!(res.is_ok());
1453 let bytes = rx.recv().await.unwrap();
1454 let command = String::from_utf8(bytes).unwrap();
1455 assert_eq!("delete test\r\n", command);
1456 }
1457
1458 #[tokio::test]
1459 async fn test_memcached_delete_error() {
1460 let (mut rx, mut client) = client!("NOT_FOUND\r\n");
1461 let key = Key::one("test").unwrap();
1462 let res = client.delete(&key).await;
1463
1464 assert!(res.is_err());
1465 let err = res.unwrap_err();
1466 assert_eq!(ErrorKind::Protocol, err.kind());
1467
1468 let bytes = rx.recv().await.unwrap();
1469 let command = String::from_utf8(bytes).unwrap();
1470 assert_eq!("delete test\r\n", command);
1471 }
1472
1473 #[tokio::test]
1478 async fn test_memcached_flush_all_with_wait_success() {
1479 let (mut rx, mut client) = client!("OK\r\n");
1480 let wait = Some(Duration::from_secs(25));
1481 let res = client.flush_all(wait).await;
1482
1483 assert!(res.is_ok());
1484 let bytes = rx.recv().await.unwrap();
1485 let command = String::from_utf8(bytes).unwrap();
1486 assert_eq!("flush_all 25\r\n", command);
1487 }
1488
1489 #[tokio::test]
1490 async fn test_memcached_flush_all_with_wait_error() {
1491 let (mut rx, mut client) = client!("ERROR\r\n");
1492 let wait = Some(Duration::from_secs(25));
1493 let res = client.flush_all(wait).await;
1494
1495 assert!(res.is_err());
1496 let err = res.unwrap_err();
1497 assert_eq!(ErrorKind::Protocol, err.kind());
1498
1499 let bytes = rx.recv().await.unwrap();
1500 let command = String::from_utf8(bytes).unwrap();
1501 assert_eq!("flush_all 25\r\n", command);
1502 }
1503
1504 #[tokio::test]
1505 async fn test_memcached_flush_all_no_wait_success() {
1506 let (mut rx, mut client) = client!("OK\r\n");
1507 let res = client.flush_all(None).await;
1508
1509 assert!(res.is_ok());
1510 let bytes = rx.recv().await.unwrap();
1511 let command = String::from_utf8(bytes).unwrap();
1512 assert_eq!("flush_all\r\n", command);
1513 }
1514
1515 #[tokio::test]
1516 async fn test_memcached_flush_all_no_wait_error() {
1517 let (mut rx, mut client) = client!("ERROR\r\n");
1518 let res = client.flush_all(None).await;
1519
1520 assert!(res.is_err());
1521 let err = res.unwrap_err();
1522 assert_eq!(ErrorKind::Protocol, err.kind());
1523
1524 let bytes = rx.recv().await.unwrap();
1525 let command = String::from_utf8(bytes).unwrap();
1526 assert_eq!("flush_all\r\n", command);
1527 }
1528
1529 #[tokio::test]
1534 async fn test_memcached_stats_empty() {
1535 let (_rx, mut client) = client!("END\r\n");
1536 let res = client.stats().await;
1537
1538 assert!(res.is_err());
1539 let err = res.unwrap_err();
1540 assert_eq!(ErrorKind::Runtime, err.kind());
1541 }
1542
1543 #[tokio::test]
1544 async fn test_memcached_stats_error() {
1545 let (_rx, mut client) = client!("SERVER_ERROR backend failure\r\n");
1546 let res = client.stats().await;
1547
1548 assert!(res.is_err());
1549 let err = res.unwrap_err();
1550 assert_eq!(ErrorKind::Protocol, err.kind());
1551 }
1552
1553 #[tokio::test]
1554 async fn test_memcached_stats_success() {
1555 let (_rx, mut client) = client!(
1556 "STAT pid 1525\r\n",
1557 "STAT uptime 271984\r\n",
1558 "STAT time 1687212809\r\n",
1559 "STAT version 1.6.14\r\n",
1560 "STAT libevent 2.1.12-stable\r\n",
1561 "STAT pointer_size 64\r\n",
1562 "STAT rusage_user 17.544323\r\n",
1563 "STAT rusage_system 11.830461\r\n",
1564 "STAT max_connections 1024\r\n",
1565 "STAT curr_connections 1\r\n",
1566 "STAT total_connections 3\r\n",
1567 "STAT rejected_connections 0\r\n",
1568 "STAT connection_structures 2\r\n",
1569 "STAT response_obj_oom 0\r\n",
1570 "STAT response_obj_count 1\r\n",
1571 "STAT response_obj_bytes 32768\r\n",
1572 "STAT read_buf_count 4\r\n",
1573 "STAT read_buf_bytes 65536\r\n",
1574 "STAT read_buf_bytes_free 16384\r\n",
1575 "STAT read_buf_oom 0\r\n",
1576 "STAT reserved_fds 20\r\n",
1577 "STAT cmd_get 1\r\n",
1578 "STAT cmd_set 0\r\n",
1579 "STAT cmd_flush 0\r\n",
1580 "STAT cmd_touch 0\r\n",
1581 "STAT cmd_meta 0\r\n",
1582 "STAT get_hits 0\r\n",
1583 "STAT get_misses 1\r\n",
1584 "STAT get_expired 0\r\n",
1585 "STAT get_flushed 0\r\n",
1586 "STAT delete_misses 0\r\n",
1587 "STAT delete_hits 0\r\n",
1588 "STAT incr_misses 0\r\n",
1589 "STAT incr_hits 0\r\n",
1590 "STAT decr_misses 0\r\n",
1591 "STAT decr_hits 0\r\n",
1592 "STAT cas_misses 0\r\n",
1593 "STAT cas_hits 0\r\n",
1594 "STAT cas_badval 0\r\n",
1595 "STAT touch_hits 0\r\n",
1596 "STAT touch_misses 0\r\n",
1597 "STAT store_too_large 0\r\n",
1598 "STAT store_no_memory 0\r\n",
1599 "STAT auth_cmds 0\r\n",
1600 "STAT auth_errors 0\r\n",
1601 "STAT bytes_read 16\r\n",
1602 "STAT bytes_written 7\r\n",
1603 "STAT limit_maxbytes 67108864\r\n",
1604 "STAT accepting_conns 1\r\n",
1605 "STAT listen_disabled_num 0\r\n",
1606 "STAT time_in_listen_disabled_us 0\r\n",
1607 "STAT threads 4\r\n",
1608 "STAT conn_yields 0\r\n",
1609 "STAT hash_power_level 16\r\n",
1610 "STAT hash_bytes 524288\r\n",
1611 "STAT hash_is_expanding 0\r\n",
1612 "STAT slab_reassign_rescues 0\r\n",
1613 "STAT slab_reassign_chunk_rescues 0\r\n",
1614 "STAT slab_reassign_evictions_nomem 0\r\n",
1615 "STAT slab_reassign_inline_reclaim 0\r\n",
1616 "STAT slab_reassign_busy_items 0\r\n",
1617 "STAT slab_reassign_busy_deletes 0\r\n",
1618 "STAT slab_reassign_running 0\r\n",
1619 "STAT slabs_moved 0\r\n",
1620 "STAT lru_crawler_running 0\r\n",
1621 "STAT lru_crawler_starts 105\r\n",
1622 "STAT lru_maintainer_juggles 271976\r\n",
1623 "STAT malloc_fails 0\r\n",
1624 "STAT log_worker_dropped 0\r\n",
1625 "STAT log_worker_written 0\r\n",
1626 "STAT log_watcher_skipped 0\r\n",
1627 "STAT log_watcher_sent 0\r\n",
1628 "STAT log_watchers 0\r\n",
1629 "STAT unexpected_napi_ids 0\r\n",
1630 "STAT round_robin_fallback 0\r\n",
1631 "STAT bytes 0\r\n",
1632 "STAT curr_items 0\r\n",
1633 "STAT total_items 0\r\n",
1634 "STAT slab_global_page_pool 0\r\n",
1635 "STAT expired_unfetched 0\r\n",
1636 "STAT evicted_unfetched 0\r\n",
1637 "STAT evicted_active 0\r\n",
1638 "STAT evictions 0\r\n",
1639 "STAT reclaimed 0\r\n",
1640 "STAT crawler_reclaimed 0\r\n",
1641 "STAT crawler_items_checked 0\r\n",
1642 "STAT lrutail_reflocked 0\r\n",
1643 "STAT moves_to_cold 0\r\n",
1644 "STAT moves_to_warm 0\r\n",
1645 "STAT moves_within_lru 0\r\n",
1646 "STAT direct_reclaims 0\r\n",
1647 "STAT lru_bumps_dropped 0\r\n",
1648 "END\r\n",
1649 );
1650 let res = client.stats().await.unwrap();
1651
1652 assert_eq!(0, res.cmd_set);
1653 assert_eq!(1, res.cmd_get);
1654 assert_eq!(1, res.get_misses);
1655 assert_eq!(0, res.get_hits);
1656 }
1657
1658 #[tokio::test]
1663 async fn test_memcached_slabs_empty() {
1664 let (_rx, mut client) = client!("STAT active_slabs 0\r\n", "STAT total_malloced 0\r\n", "END\r\n");
1665 let res = client.slabs().await.unwrap();
1666
1667 assert!(res.slabs.is_empty());
1668 }
1669
1670 #[tokio::test]
1671 async fn test_memcached_slabs_error() {
1672 let (_rx, mut client) = client!("ERROR Too many open connections\r\n");
1673 let res = client.slabs().await;
1674
1675 assert!(res.is_err());
1676 let err = res.unwrap_err();
1677 assert_eq!(ErrorKind::Protocol, err.kind());
1678 }
1679
1680 #[tokio::test]
1681 async fn test_memcached_slabs_success() {
1682 let (_rx, mut client) = client!(
1683 "STAT 6:chunk_size 304\r\n",
1684 "STAT 6:chunks_per_page 3449\r\n",
1685 "STAT 6:total_pages 1\r\n",
1686 "STAT 6:total_chunks 3449\r\n",
1687 "STAT 6:used_chunks 1\r\n",
1688 "STAT 6:free_chunks 3448\r\n",
1689 "STAT 6:free_chunks_end 0\r\n",
1690 "STAT 6:get_hits 951\r\n",
1691 "STAT 6:cmd_set 100\r\n",
1692 "STAT 6:delete_hits 0\r\n",
1693 "STAT 6:incr_hits 0\r\n",
1694 "STAT 6:decr_hits 0\r\n",
1695 "STAT 6:cas_hits 0\r\n",
1696 "STAT 6:cas_badval 0\r\n",
1697 "STAT 6:touch_hits 0\r\n",
1698 "STAT 7:chunk_size 384\r\n",
1699 "STAT 7:chunks_per_page 2730\r\n",
1700 "STAT 7:total_pages 1\r\n",
1701 "STAT 7:total_chunks 2730\r\n",
1702 "STAT 7:used_chunks 5\r\n",
1703 "STAT 7:free_chunks 2725\r\n",
1704 "STAT 7:free_chunks_end 0\r\n",
1705 "STAT 7:get_hits 4792\r\n",
1706 "STAT 7:cmd_set 520\r\n",
1707 "STAT 7:delete_hits 0\r\n",
1708 "STAT 7:incr_hits 0\r\n",
1709 "STAT 7:decr_hits 0\r\n",
1710 "STAT 7:cas_hits 0\r\n",
1711 "STAT 7:cas_badval 0\r\n",
1712 "STAT 7:touch_hits 0\r\n",
1713 "STAT active_slabs 2\r\n",
1714 "STAT total_malloced 30408704\r\n",
1715 "END\r\n",
1716 );
1717 let res = client.slabs().await.unwrap();
1718
1719 let expected = vec![
1720 Slab {
1721 id: 6,
1722 chunk_size: 304,
1723 chunks_per_page: 3449,
1724 total_pages: 1,
1725 total_chunks: 3449,
1726 used_chunks: 1,
1727 free_chunks: 3448,
1728 get_hits: 951,
1729 cmd_set: 100,
1730 delete_hits: 0,
1731 incr_hits: 0,
1732 decr_hits: 0,
1733 cas_hits: 0,
1734 cas_badval: 0,
1735 touch_hits: 0,
1736 },
1737 Slab {
1738 id: 7,
1739 chunk_size: 384,
1740 chunks_per_page: 2730,
1741 total_pages: 1,
1742 total_chunks: 2730,
1743 used_chunks: 5,
1744 free_chunks: 2725,
1745 get_hits: 4792,
1746 cmd_set: 520,
1747 delete_hits: 0,
1748 incr_hits: 0,
1749 decr_hits: 0,
1750 cas_hits: 0,
1751 cas_badval: 0,
1752 touch_hits: 0,
1753 },
1754 ];
1755
1756 assert_eq!(expected, res.slabs);
1757 }
1758
1759 #[tokio::test]
1764 async fn test_memcached_items_empty() {
1765 let (_rx, mut client) = client!();
1766 let res = client.items().await.unwrap();
1767
1768 assert!(res.is_empty());
1769 }
1770
1771 #[tokio::test]
1772 async fn test_memcached_items_error() {
1773 let (_rx, mut client) = client!("ERROR Too many open connections\r\n");
1774 let res = client.items().await;
1775
1776 assert!(res.is_err());
1777 let err = res.unwrap_err();
1778 assert_eq!(ErrorKind::Protocol, err.kind());
1779 }
1780
1781 #[tokio::test]
1782 async fn test_memcached_items_success() {
1783 let (_rx, mut client) = client!(
1784 "STAT items:39:number 3\r\n",
1785 "STAT items:39:number_hot 0\r\n",
1786 "STAT items:39:number_warm 1\r\n",
1787 "STAT items:39:number_cold 2\r\n",
1788 "STAT items:39:age_hot 0\r\n",
1789 "STAT items:39:age_warm 7\r\n",
1790 "STAT items:39:age 8\r\n",
1791 "STAT items:39:mem_requested 1535788\r\n",
1792 "STAT items:39:evicted 1646\r\n",
1793 "STAT items:39:evicted_nonzero 1646\r\n",
1794 "STAT items:39:evicted_time 0\r\n",
1795 "STAT items:39:outofmemory 9\r\n",
1796 "STAT items:39:tailrepairs 0\r\n",
1797 "STAT items:39:reclaimed 13\r\n",
1798 "STAT items:39:expired_unfetched 4\r\n",
1799 "STAT items:39:evicted_unfetched 202\r\n",
1800 "STAT items:39:evicted_active 6\r\n",
1801 "STAT items:39:crawler_reclaimed 0\r\n",
1802 "STAT items:39:crawler_items_checked 40\r\n",
1803 "STAT items:39:lrutail_reflocked 17365\r\n",
1804 "STAT items:39:moves_to_cold 8703\r\n",
1805 "STAT items:39:moves_to_warm 7285\r\n",
1806 "STAT items:39:moves_within_lru 3651\r\n",
1807 "STAT items:39:direct_reclaims 1949\r\n",
1808 "STAT items:39:hits_to_hot 894\r\n",
1809 "STAT items:39:hits_to_warm 4079\r\n",
1810 "STAT items:39:hits_to_cold 8043\r\n",
1811 "STAT items:39:hits_to_temp 0\r\n",
1812 "END\r\n",
1813 );
1814 let res = client.items().await.unwrap();
1815
1816 let expected = SlabItems {
1817 items: vec![SlabItem {
1818 id: 39,
1819 number: 3,
1820 number_hot: 0,
1821 number_warm: 1,
1822 number_cold: 2,
1823 age_hot: 0,
1824 age_warm: 7,
1825 age: 8,
1826 mem_requested: 1535788,
1827 evicted: 1646,
1828 evicted_nonzero: 1646,
1829 evicted_time: 0,
1830 out_of_memory: 9,
1831 tail_repairs: 0,
1832 reclaimed: 13,
1833 expired_unfetched: 4,
1834 evicted_unfetched: 202,
1835 evicted_active: 6,
1836 crawler_reclaimed: 0,
1837 crawler_items_checked: 40,
1838 lrutail_reflocked: 17365,
1839 moves_to_cold: 8703,
1840 moves_to_warm: 7285,
1841 moves_within_lru: 3651,
1842 direct_reclaims: 1949,
1843 hits_to_hot: 894,
1844 hits_to_warm: 4079,
1845 hits_to_cold: 8043,
1846 hits_to_temp: 0,
1847 }],
1848 };
1849
1850 assert_eq!(expected, res);
1851 }
1852
1853 #[tokio::test]
1858 async fn test_memcached_metas_empty() {
1859 let (_rx, mut client) = client!();
1860 let res = client.metas().await.unwrap();
1861
1862 assert!(res.is_empty());
1863 }
1864
1865 #[tokio::test]
1866 async fn test_memcached_metas_error() {
1867 let (_rx, mut client) = client!("BUSY crawler is busy\r\n",);
1868 let res = client.metas().await;
1869
1870 assert!(res.is_err());
1871 let err = res.unwrap_err();
1872 assert_eq!(ErrorKind::Protocol, err.kind());
1873 }
1874
1875 #[tokio::test]
1876 async fn test_memcached_metas_success() {
1877 let (_rx, mut client) = client!(
1878 "key=memcached%2Fmurmur3_hash.c exp=1687216956 la=1687216656 cas=259502 fetch=yes cls=17 size=2912\r\n",
1879 "key=memcached%2Fmd5.h exp=1687216956 la=1687216656 cas=259731 fetch=yes cls=17 size=3593\r\n",
1880 "END\r\n",
1881 );
1882 let res = client.metas().await.unwrap();
1883
1884 let expected = vec![
1885 Meta {
1886 key: "memcached/murmur3_hash.c".to_string(),
1887 expires: 1687216956,
1888 size: 2912,
1889 },
1890 Meta {
1891 key: "memcached/md5.h".to_string(),
1892 expires: 1687216956,
1893 size: 3593,
1894 },
1895 ];
1896
1897 assert_eq!(expected, res);
1898 }
1899}