1use std::borrow::Borrow;
2use std::cmp::Ordering;
3use std::collections::{BTreeSet, HashMap};
4use std::error;
5use std::fmt;
6use std::io;
7use std::ops::Deref;
8use std::str::FromStr;
9use std::time::Duration;
10use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter, Lines};
11
12#[derive(Debug, Default, PartialEq, Clone)]
13pub struct Stats {
14 pub pid: i64,
16 pub uptime: u64,
17 pub server_time: i64,
18 pub threads: u64,
19 pub version: String,
20
21 pub rusage_user: f64,
23 pub rusage_system: f64,
24
25 pub max_connections: u64,
27 pub curr_connections: u64,
28 pub total_connections: u64,
29 pub rejected_connections: u64,
30
31 pub cmd_get: u64,
33 pub cmd_set: u64,
34 pub cmd_flush: u64,
35 pub cmd_touch: u64,
36 pub cmd_meta: u64,
37
38 pub get_hits: u64,
40 pub get_misses: u64,
41 pub get_expired: u64,
42 pub get_flushed: u64,
43
44 pub store_too_large: u64,
46 pub store_no_memory: u64,
47
48 pub delete_hits: u64,
50 pub delete_misses: u64,
51
52 pub incr_hits: u64,
54 pub incr_misses: u64,
55 pub decr_hits: u64,
56 pub decr_misses: u64,
57
58 pub touch_hits: u64,
60 pub touch_misses: u64,
61
62 pub bytes_read: u64,
64 pub bytes_written: u64,
65 pub bytes: u64,
66 pub max_bytes: u64,
67
68 pub curr_items: u64,
70 pub total_items: u64,
71 pub evictions: u64,
72}
73
74impl TryFrom<&HashMap<String, String>> for Stats {
75 type Error = MtopError;
76
77 fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
78 Ok(Stats {
79 pid: parse_field("pid", value)?,
80 uptime: parse_field("uptime", value)?,
81 server_time: parse_field("time", value)?,
82 version: parse_field("version", value)?,
83 threads: parse_field("threads", value)?,
84
85 rusage_user: parse_field("rusage_user", value)?,
86 rusage_system: parse_field("rusage_system", value)?,
87
88 max_connections: parse_field("max_connections", value)?,
89 curr_connections: parse_field("curr_connections", value)?,
90 total_connections: parse_field("total_connections", value)?,
91 rejected_connections: parse_field("rejected_connections", value)?,
92
93 cmd_get: parse_field("cmd_get", value)?,
94 cmd_set: parse_field("cmd_set", value)?,
95 cmd_flush: parse_field("cmd_flush", value)?,
96 cmd_touch: parse_field("cmd_touch", value)?,
97 cmd_meta: parse_field("cmd_meta", value)?,
98
99 get_hits: parse_field("get_hits", value)?,
100 get_misses: parse_field("get_misses", value)?,
101 get_expired: parse_field("get_expired", value)?,
102 get_flushed: parse_field("get_flushed", value)?,
103
104 store_too_large: parse_field("store_too_large", value)?,
105 store_no_memory: parse_field("store_no_memory", value)?,
106
107 delete_hits: parse_field("delete_hits", value)?,
108 delete_misses: parse_field("delete_misses", value)?,
109
110 incr_hits: parse_field("incr_hits", value)?,
111 incr_misses: parse_field("incr_misses", value)?,
112
113 decr_hits: parse_field("decr_hits", value)?,
114 decr_misses: parse_field("decr_misses", value)?,
115
116 touch_hits: parse_field("touch_hits", value)?,
117 touch_misses: parse_field("touch_misses", value)?,
118
119 bytes_read: parse_field("bytes_read", value)?,
120 bytes_written: parse_field("bytes_written", value)?,
121 bytes: parse_field("bytes", value)?,
122 max_bytes: parse_field("limit_maxbytes", value)?,
123
124 curr_items: parse_field("curr_items", value)?,
125 total_items: parse_field("total_items", value)?,
126 evictions: parse_field("evictions", value)?,
127 })
128 }
129}
130
131#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
132pub struct Slab {
133 pub id: u64,
134 pub chunk_size: u64,
135 pub chunks_per_page: u64,
136 pub total_pages: u64,
137 pub total_chunks: u64,
138 pub used_chunks: u64,
139 pub free_chunks: u64,
140 pub get_hits: u64,
141 pub cmd_set: u64,
142 pub delete_hits: u64,
143 pub incr_hits: u64,
144 pub decr_hits: u64,
145 pub cas_hits: u64,
146 pub cas_badval: u64,
147 pub touch_hits: u64,
148}
149
150impl PartialOrd for Slab {
151 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
152 Some(self.cmp(other))
153 }
154}
155
156impl Ord for Slab {
157 fn cmp(&self, other: &Self) -> Ordering {
158 self.id.cmp(&other.id)
159 }
160}
161
162#[derive(Debug, Default, PartialEq, Eq, Clone)]
163#[repr(transparent)]
164pub struct Slabs {
165 slabs: Vec<Slab>,
166}
167
168impl Slabs {
169 pub fn iter(&self) -> impl ExactSizeIterator<Item = &Slab> {
170 self.slabs.iter()
171 }
172
173 pub fn len(&self) -> usize {
174 self.slabs.len()
175 }
176
177 pub fn is_empty(&self) -> bool {
178 self.slabs.is_empty()
179 }
180
181 pub fn find_for_size(&self, size: u64) -> Option<&Slab> {
182 self.slabs
186 .get(self.slabs.partition_point(|s| s.chunk_size < size))
187 .or_else(|| self.slabs.last())
188 }
189}
190
191impl IntoIterator for Slabs {
192 type Item = Slab;
193 type IntoIter = std::vec::IntoIter<Slab>;
194
195 fn into_iter(self) -> Self::IntoIter {
196 self.slabs.into_iter()
197 }
198}
199
200impl TryFrom<&HashMap<String, String>> for Slabs {
201 type Error = MtopError;
202
203 fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
204 let mut ids = BTreeSet::new();
209 for k in value.keys() {
210 let key_id: Option<u64> = k.split_once(':').map(|(raw, _rest)| raw).and_then(|raw| raw.parse().ok());
211
212 if let Some(id) = key_id {
213 ids.insert(id);
214 }
215 }
216
217 let mut slabs = Vec::with_capacity(ids.len());
218
219 for id in ids {
220 slabs.push(Slab {
221 id,
222 chunk_size: parse_field(&format!("{}:chunk_size", id), value)?,
223 chunks_per_page: parse_field(&format!("{}:chunks_per_page", id), value)?,
224 total_pages: parse_field(&format!("{}:total_pages", id), value)?,
225 total_chunks: parse_field(&format!("{}:total_chunks", id), value)?,
226 used_chunks: parse_field(&format!("{}:used_chunks", id), value)?,
227 free_chunks: parse_field(&format!("{}:free_chunks", id), value)?,
228 get_hits: parse_field(&format!("{}:get_hits", id), value)?,
229 cmd_set: parse_field(&format!("{}:cmd_set", id), value)?,
230 delete_hits: parse_field(&format!("{}:delete_hits", id), value)?,
231 incr_hits: parse_field(&format!("{}:incr_hits", id), value)?,
232 decr_hits: parse_field(&format!("{}:decr_hits", id), value)?,
233 cas_hits: parse_field(&format!("{}:cas_hits", id), value)?,
234 cas_badval: parse_field(&format!("{}:cas_badval", id), value)?,
235 touch_hits: parse_field(&format!("{}:touch_hits", id), value)?,
236 })
237 }
238
239 Ok(Self { slabs })
240 }
241}
242
243#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
244pub struct SlabItem {
245 pub id: u64,
246 pub number: u64,
247 pub number_hot: u64,
248 pub number_warm: u64,
249 pub number_cold: u64,
250 pub age_hot: u64,
251 pub age_warm: u64,
252 pub age: u64,
253 pub mem_requested: u64,
254 pub evicted: u64,
255 pub evicted_nonzero: u64,
256 pub evicted_time: u64,
257 pub out_of_memory: u64,
258 pub tail_repairs: u64,
259 pub reclaimed: u64,
260 pub expired_unfetched: u64,
261 pub evicted_unfetched: u64,
262 pub evicted_active: u64,
263 pub crawler_reclaimed: u64,
264 pub crawler_items_checked: u64,
265 pub lrutail_reflocked: u64,
266 pub moves_to_cold: u64,
267 pub moves_to_warm: u64,
268 pub moves_within_lru: u64,
269 pub direct_reclaims: u64,
270 pub hits_to_hot: u64,
271 pub hits_to_warm: u64,
272 pub hits_to_cold: u64,
273 pub hits_to_temp: u64,
274}
275
276impl PartialOrd for SlabItem {
277 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
278 Some(self.cmp(other))
279 }
280}
281
282impl Ord for SlabItem {
283 fn cmp(&self, other: &Self) -> Ordering {
284 self.id.cmp(&other.id)
285 }
286}
287
288#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
289#[repr(transparent)]
290pub struct SlabItems {
291 items: Vec<SlabItem>,
292}
293
294impl SlabItems {
295 pub fn iter(&self) -> impl ExactSizeIterator<Item = &SlabItem> {
296 self.items.iter()
297 }
298
299 pub fn len(&self) -> usize {
300 self.items.len()
301 }
302
303 pub fn is_empty(&self) -> bool {
304 self.items.is_empty()
305 }
306}
307
308impl IntoIterator for SlabItems {
309 type Item = SlabItem;
310 type IntoIter = std::vec::IntoIter<SlabItem>;
311
312 fn into_iter(self) -> Self::IntoIter {
313 self.items.into_iter()
314 }
315}
316
317impl TryFrom<&HashMap<String, String>> for SlabItems {
318 type Error = MtopError;
319
320 fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
321 let mut ids = BTreeSet::new();
326 for k in value.keys() {
327 let key_id: Option<u64> = k
328 .trim_start_matches("items:")
329 .split_once(':')
330 .map(|(raw, _rest)| raw)
331 .and_then(|raw| raw.parse().ok());
332
333 if let Some(id) = key_id {
334 ids.insert(id);
335 }
336 }
337
338 let mut items = Vec::with_capacity(ids.len());
339
340 for id in ids {
341 items.push(SlabItem {
342 id,
343 number: parse_field(&format!("items:{}:number", id), value)?,
344 number_hot: parse_field(&format!("items:{}:number_hot", id), value)?,
345 number_warm: parse_field(&format!("items:{}:number_warm", id), value)?,
346 number_cold: parse_field(&format!("items:{}:number_cold", id), value)?,
347 age_hot: parse_field(&format!("items:{}:age_hot", id), value)?,
348 age_warm: parse_field(&format!("items:{}:age_warm", id), value)?,
349 age: parse_field(&format!("items:{}:age", id), value)?,
350 mem_requested: parse_field(&format!("items:{}:mem_requested", id), value)?,
351 evicted: parse_field(&format!("items:{}:evicted", id), value)?,
352 evicted_nonzero: parse_field(&format!("items:{}:evicted_nonzero", id), value)?,
353 evicted_time: parse_field(&format!("items:{}:evicted_time", id), value)?,
354 out_of_memory: parse_field(&format!("items:{}:outofmemory", id), value)?,
355 tail_repairs: parse_field(&format!("items:{}:tailrepairs", id), value)?,
356 reclaimed: parse_field(&format!("items:{}:reclaimed", id), value)?,
357 expired_unfetched: parse_field(&format!("items:{}:expired_unfetched", id), value)?,
358 evicted_unfetched: parse_field(&format!("items:{}:evicted_unfetched", id), value)?,
359 evicted_active: parse_field(&format!("items:{}:evicted_active", id), value)?,
360 crawler_reclaimed: parse_field(&format!("items:{}:crawler_reclaimed", id), value)?,
361 crawler_items_checked: parse_field(&format!("items:{}:crawler_items_checked", id), value)?,
362 lrutail_reflocked: parse_field(&format!("items:{}:lrutail_reflocked", id), value)?,
363 moves_to_cold: parse_field(&format!("items:{}:moves_to_cold", id), value)?,
364 moves_to_warm: parse_field(&format!("items:{}:moves_to_warm", id), value)?,
365 moves_within_lru: parse_field(&format!("items:{}:moves_within_lru", id), value)?,
366 direct_reclaims: parse_field(&format!("items:{}:direct_reclaims", id), value)?,
367 hits_to_hot: parse_field(&format!("items:{}:hits_to_hot", id), value)?,
368 hits_to_warm: parse_field(&format!("items:{}:hits_to_warm", id), value)?,
369 hits_to_cold: parse_field(&format!("items:{}:hits_to_cold", id), value)?,
370 hits_to_temp: parse_field(&format!("items:{}:hits_to_temp", id), value)?,
371 })
372 }
373
374 Ok(Self { items })
375 }
376}
377
378#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
379pub struct Meta {
380 pub key: String,
381 pub expires: i64,
382 pub size: u64,
383}
384
385impl Meta {
386 const KEYS: &'static [&'static str] = &["key", "exp", "size"];
390}
391
392impl TryFrom<&HashMap<String, String>> for Meta {
393 type Error = MtopError;
394
395 fn try_from(value: &HashMap<String, String>) -> Result<Self, Self::Error> {
396 Ok(Meta {
397 key: parse_field("key", value)?,
398 expires: parse_field("exp", value)?,
399 size: parse_field("size", value)?,
400 })
401 }
402}
403
404impl PartialOrd for Meta {
405 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
406 Some(self.cmp(other))
407 }
408}
409
410impl Ord for Meta {
411 fn cmp(&self, other: &Self) -> Ordering {
412 self.key.cmp(&other.key)
413 }
414}
415
416#[derive(Debug, Default, PartialEq, Eq, Clone, Hash)]
417pub struct Value {
418 pub key: String,
419 pub cas: u64,
420 pub flags: u64,
421 pub data: Vec<u8>,
422}
423
424impl PartialOrd for Value {
425 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
426 Some(self.cmp(other))
427 }
428}
429
430impl Ord for Value {
431 fn cmp(&self, other: &Self) -> Ordering {
432 self.key.cmp(&other.key)
433 }
434}
435
436fn parse_field<T>(key: &str, map: &HashMap<String, String>) -> Result<T, MtopError>
437where
438 T: FromStr,
439 <T as FromStr>::Err: fmt::Display + Send + Sync + error::Error + 'static,
440{
441 map.get(key)
442 .ok_or_else(|| MtopError::runtime(format!("field {} missing", key)))
443 .and_then(|v| {
444 v.parse()
445 .map_err(|e| MtopError::runtime_cause(format!("field {} value '{}'", key, v), e))
446 })
447}
448
449fn parse_value<T>(val: &str, line: &str) -> Result<T, MtopError>
450where
451 T: FromStr + fmt::Display,
452 <T as FromStr>::Err: fmt::Display + Send + Sync + error::Error + 'static,
453{
454 val.parse()
455 .map_err(|e| MtopError::runtime_cause(format!("parsing {} from '{}'", val, line), e))
456}
457
458#[derive(Debug, PartialOrd, PartialEq, Copy, Clone)]
459pub enum ErrorKind {
460 Runtime,
461 IO,
462 Protocol,
463 Configuration,
464}
465
466impl fmt::Display for ErrorKind {
467 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
468 match self {
469 Self::Runtime => write!(f, "runtime error"),
470 Self::IO => write!(f, "io error"),
471 Self::Protocol => write!(f, "protocol error"),
472 Self::Configuration => write!(f, "configuration error"),
473 }
474 }
475}
476
477#[derive(Debug)]
478enum ErrorRepr {
479 Message(String),
480 Cause(Box<dyn error::Error + Send + Sync + 'static>),
481 MessageCause(String, Box<dyn error::Error + Send + Sync + 'static>),
482}
483
484#[derive(Debug)]
485pub struct MtopError {
486 kind: ErrorKind,
487 repr: ErrorRepr,
488}
489
490impl MtopError {
491 pub fn runtime<S>(msg: S) -> MtopError
492 where
493 S: Into<String>,
494 {
495 MtopError {
496 kind: ErrorKind::Runtime,
497 repr: ErrorRepr::Message(msg.into()),
498 }
499 }
500
501 pub fn runtime_cause<S, E>(msg: S, e: E) -> MtopError
502 where
503 S: Into<String>,
504 E: error::Error + Send + Sync + 'static,
505 {
506 MtopError {
507 kind: ErrorKind::Runtime,
508 repr: ErrorRepr::MessageCause(msg.into(), Box::new(e)),
509 }
510 }
511
512 pub fn configuration<S>(msg: S) -> MtopError
513 where
514 S: Into<String>,
515 {
516 MtopError {
517 kind: ErrorKind::Configuration,
518 repr: ErrorRepr::Message(msg.into()),
519 }
520 }
521
522 pub fn configuration_cause<S, E>(msg: S, e: E) -> MtopError
523 where
524 S: Into<String>,
525 E: error::Error + Send + Sync + 'static,
526 {
527 MtopError {
528 kind: ErrorKind::Configuration,
529 repr: ErrorRepr::MessageCause(msg.into(), Box::new(e)),
530 }
531 }
532
533 pub fn timeout<D>(t: Duration, operation: D) -> MtopError
534 where
535 D: fmt::Display,
536 {
537 MtopError {
538 kind: ErrorKind::IO,
539 repr: ErrorRepr::Message(format!("operation {} timed out after {:?}", operation, t)),
540 }
541 }
542
543 pub fn kind(&self) -> ErrorKind {
544 self.kind
545 }
546}
547
548impl fmt::Display for MtopError {
549 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
550 match &self.repr {
551 ErrorRepr::Message(msg) => write!(f, "{}: {}", self.kind, msg),
552 ErrorRepr::Cause(e) => write!(f, "{}: {}", self.kind, e),
553 ErrorRepr::MessageCause(msg, e) => write!(f, "{}: {}: {}", self.kind, msg, e),
554 }
555 }
556}
557
558impl error::Error for MtopError {
559 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
560 match &self.repr {
561 ErrorRepr::Message(_) => None,
562 ErrorRepr::Cause(e) => Some(e.as_ref()),
563 ErrorRepr::MessageCause(_, e) => Some(e.as_ref()),
564 }
565 }
566}
567
568impl From<(String, io::Error)> for MtopError {
569 fn from((s, e): (String, io::Error)) -> Self {
570 MtopError {
571 kind: ErrorKind::IO,
572 repr: ErrorRepr::MessageCause(s, Box::new(e)),
573 }
574 }
575}
576
577impl From<io::Error> for MtopError {
578 fn from(e: io::Error) -> Self {
579 MtopError {
580 kind: ErrorKind::IO,
581 repr: ErrorRepr::Cause(Box::new(e)),
582 }
583 }
584}
585
586impl From<ProtocolError> for MtopError {
587 fn from(e: ProtocolError) -> Self {
588 MtopError {
589 kind: ErrorKind::Protocol,
590 repr: ErrorRepr::Cause(Box::new(e)),
591 }
592 }
593}
594
595#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
596pub enum ProtocolErrorKind {
597 BadClass,
598 Busy,
599 Client,
600 NotFound,
601 NotStored,
602 Server,
603 Syntax,
604}
605
606impl fmt::Display for ProtocolErrorKind {
607 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
608 match self {
609 Self::BadClass => "BADCLASS".fmt(f),
610 Self::Busy => "BUSY".fmt(f),
611 Self::Client => "CLIENT_ERROR".fmt(f),
612 Self::NotFound => "NOT_FOUND".fmt(f),
613 Self::NotStored => "NOT_STORED".fmt(f),
614 Self::Server => "SERVER_ERROR".fmt(f),
615 Self::Syntax => "ERROR".fmt(f),
616 }
617 }
618}
619
620#[derive(Debug)]
621pub struct ProtocolError {
622 kind: ProtocolErrorKind,
623 message: Option<String>,
624}
625
626impl ProtocolError {
627 pub fn kind(&self) -> ProtocolErrorKind {
628 self.kind
629 }
630}
631
632impl fmt::Display for ProtocolError {
633 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
634 if let Some(msg) = &self.message {
635 write!(f, "{} {}", self.kind, msg)
636 } else {
637 write!(f, "{}", self.kind)
638 }
639 }
640}
641
642impl error::Error for ProtocolError {}
643
644#[derive(Debug, Eq, PartialEq, Clone)]
645enum Command<'a> {
646 Add(&'a Key, u64, u32, &'a [u8]),
647 CrawlerMetadump,
648 Decr(&'a Key, u64),
649 Delete(&'a Key),
650 FlushAll,
651 FlushAllWait(u64),
652 Gets(&'a [Key]),
653 Incr(&'a Key, u64),
654 Replace(&'a Key, u64, u32, &'a [u8]),
655 Stats,
656 StatsItems,
657 StatsSlabs,
658 Set(&'a Key, u64, u32, &'a [u8]),
659 Touch(&'a Key, u32),
660 Version,
661}
662
663impl<'a> From<Command<'a>> for Vec<u8> {
664 fn from(value: Command<'a>) -> Self {
665 match value {
666 Command::Add(key, flags, ttl, data) => storage_command("add", key, flags, ttl, data),
667 Command::CrawlerMetadump => "lru_crawler metadump hash\r\n".to_owned().into_bytes(),
668 Command::Decr(key, delta) => format!("decr {} {}\r\n", key, delta).into_bytes(),
669 Command::Delete(key) => format!("delete {}\r\n", key).into_bytes(),
670 Command::FlushAll => "flush_all\r\n".to_owned().into_bytes(),
671 Command::FlushAllWait(wait) => format!("flush_all {}\r\n", wait).into_bytes(),
672 Command::Gets(keys) => format!("gets {}\r\n", keys.join(" ")).into_bytes(),
673 Command::Incr(key, delta) => format!("incr {} {}\r\n", key, delta).into_bytes(),
674 Command::Replace(key, flags, ttl, data) => storage_command("replace", key, flags, ttl, data),
675 Command::Stats => "stats\r\n".to_owned().into_bytes(),
676 Command::StatsItems => "stats items\r\n".to_owned().into_bytes(),
677 Command::StatsSlabs => "stats slabs\r\n".to_owned().into_bytes(),
678 Command::Set(key, flags, ttl, data) => storage_command("set", key, flags, ttl, data),
679 Command::Touch(key, ttl) => format!("touch {} {}\r\n", key, ttl).into_bytes(),
680 Command::Version => "version\r\n".to_owned().into_bytes(),
681 }
682 }
683}
684
685fn storage_command(verb: &str, key: &Key, flags: u64, ttl: u32, data: &[u8]) -> Vec<u8> {
686 let mut bytes = Vec::with_capacity(key.len() + data.len() + 32);
687 io::Write::write_all(
688 &mut bytes,
689 format!("{} {} {} {} {}\r\n", verb, key, flags, ttl, data.len()).as_bytes(),
690 )
691 .unwrap();
692 io::Write::write_all(&mut bytes, data).unwrap();
693 io::Write::write_all(&mut bytes, "\r\n".as_bytes()).unwrap();
694 bytes
695}
696
697pub struct Memcached {
698 read: Lines<BufReader<Box<dyn AsyncRead + Send + Sync + Unpin>>>,
699 write: BufWriter<Box<dyn AsyncWrite + Send + Sync + Unpin>>,
700}
701
702impl Memcached {
703 const MAX_PAYLOAD_SIZE: u64 = 1024 * 1024 * 1024;
704
705 pub fn new<R, W>(read: R, write: W) -> Self
706 where
707 R: AsyncRead + Send + Sync + Unpin + 'static,
708 W: AsyncWrite + Send + Sync + Unpin + 'static,
709 {
710 Memcached {
711 read: BufReader::<Box<dyn AsyncRead + Send + Sync + Unpin>>::new(Box::new(read)).lines(),
712 write: BufWriter::new(Box::new(write)),
713 }
714 }
715
716 pub async fn stats(&mut self) -> Result<Stats, MtopError> {
718 self.send(Command::Stats).await?;
719 let raw = self.read_stats_response().await?;
720 Stats::try_from(&raw)
721 }
722
723 pub async fn slabs(&mut self) -> Result<Slabs, MtopError> {
728 self.send(Command::StatsSlabs).await?;
729 let raw = self.read_stats_response().await?;
730 Slabs::try_from(&raw)
731 }
732
733 pub async fn items(&mut self) -> Result<SlabItems, MtopError> {
738 self.send(Command::StatsItems).await?;
739 let raw = self.read_stats_response().await?;
740 SlabItems::try_from(&raw)
741 }
742
743 async fn read_stats_response(&mut self) -> Result<HashMap<String, String>, MtopError> {
744 let mut out = HashMap::new();
745
746 while let Some(v) = self.read.next_line().await? {
747 if v == "END" {
748 break;
749 }
750
751 let (key, val) = Self::parse_stat_line(&v)?;
752 out.insert(key.to_owned(), val.to_owned());
753 }
754
755 Ok(out)
756 }
757
758 fn parse_stat_line(line: &str) -> Result<(&str, &str), MtopError> {
759 let mut parts = line.splitn(3, ' ');
760 match (parts.next(), parts.next(), parts.next()) {
761 (Some("STAT"), Some(key), Some(val)) => Ok((key, val)),
762 _ => {
763 if let Some(err) = Self::parse_error(line) {
764 Err(MtopError::from(err))
765 } else {
766 Err(MtopError::runtime(format!("unable to parse '{}'", line)))
767 }
768 }
769 }
770 }
771
772 pub async fn metas(&mut self) -> Result<Vec<Meta>, MtopError> {
776 self.send(Command::CrawlerMetadump).await?;
777 let mut out = Vec::new();
778 let mut raw = HashMap::new();
779
780 while let Some(v) = self.read.next_line().await? {
781 if v == "END" {
782 break;
783 }
784
785 if let Some(err) = Self::parse_error(&v) {
789 return Err(MtopError::from(err));
790 }
791
792 let item = Self::parse_crawler_meta(&v, Meta::KEYS, &mut raw)?;
793 out.push(item);
794 }
795
796 Ok(out)
797 }
798
799 fn parse_crawler_meta(line: &str, keys: &[&str], raw: &mut HashMap<String, String>) -> Result<Meta, MtopError> {
800 raw.clear();
802
803 for p in line.split(' ') {
804 let (key, val) = p
805 .split_once('=')
806 .ok_or_else(|| MtopError::runtime(format!("unexpected metadump format '{}'", line)))?;
807
808 if !keys.contains(&key) {
812 continue;
813 }
814
815 let decoded = urlencoding::decode(val)
816 .map_err(|e| MtopError::runtime_cause(format!("unexpected metadump encoding '{}'", line), e))?;
817 raw.insert(key.to_owned(), decoded.into_owned());
818 }
819
820 Meta::try_from(raw.deref())
821 }
822
823 pub async fn ping(&mut self) -> Result<(), MtopError> {
825 self.send(Command::Version).await?;
826 if let Some(v) = self.read.next_line().await? {
827 if let Some(e) = Self::parse_error(&v) {
828 return Err(MtopError::from(e));
829 }
830 if !v.starts_with("VERSION") {
831 return Err(MtopError::runtime(format!("unable to parse '{}'", v)));
832 }
833 }
834
835 Ok(())
836 }
837
838 pub async fn flush_all(&mut self, wait: Option<Duration>) -> Result<(), MtopError> {
841 let cmd = if let Some(d) = wait {
842 Command::FlushAllWait(d.as_secs())
843 } else {
844 Command::FlushAll
845 };
846
847 self.send(cmd).await?;
848 self.read_simple_response("OK").await
849 }
850
851 pub async fn get(&mut self, keys: &[Key]) -> Result<HashMap<String, Value>, MtopError> {
854 self.send(Command::Gets(keys)).await?;
855 let mut out = HashMap::with_capacity(keys.len());
856
857 while let Some(v) = self.read.next_line().await? {
858 if v == "END" {
859 break;
860 }
861
862 let value = self.parse_gets_value(&v).await?;
863 out.insert(value.key.clone(), value);
864 }
865
866 Ok(out)
867 }
868
869 async fn parse_gets_value(&mut self, line: &str) -> Result<Value, MtopError> {
870 let mut parts = line.splitn(5, ' ');
871
872 match (parts.next(), parts.next(), parts.next(), parts.next(), parts.next()) {
873 (Some("VALUE"), Some(k), Some(flags), Some(len), Some(cas)) => {
874 let flags: u64 = parse_value(flags, line)?;
875 let len: u64 = parse_value(len, line)?;
876 let cas: u64 = parse_value(cas, line)?;
877
878 if len > Self::MAX_PAYLOAD_SIZE {
883 return Err(MtopError::runtime(format!(
884 "server response of length {} exceeds client max of {}",
885 len,
886 Self::MAX_PAYLOAD_SIZE
887 )));
888 }
889
890 let mut data = Vec::with_capacity(len as usize + 2);
892 let reader = self.read.get_mut();
893 reader.take(len + 2).read_to_end(&mut data).await?;
894 data.truncate(len as usize);
895
896 Ok(Value {
897 key: k.to_owned(),
898 flags,
899 cas,
900 data,
901 })
902 }
903 _ => {
904 if let Some(err) = Self::parse_error(line) {
908 Err(MtopError::from(err))
909 } else {
910 Err(MtopError::runtime(format!("unable to parse '{}'", line)))
911 }
912 }
913 }
914 }
915
916 pub async fn incr(&mut self, key: &Key, delta: u64) -> Result<u64, MtopError> {
919 self.send(Command::Incr(key, delta)).await?;
920 if let Some(v) = self.read.next_line().await? {
921 Self::parse_numeric_response(&v)
922 } else {
923 Err(MtopError::runtime("unexpected empty response"))
924 }
925 }
926
927 pub async fn decr(&mut self, key: &Key, delta: u64) -> Result<u64, MtopError> {
930 self.send(Command::Decr(key, delta)).await?;
931 if let Some(v) = self.read.next_line().await? {
932 Self::parse_numeric_response(&v)
933 } else {
934 Err(MtopError::runtime("unexpected empty response"))
935 }
936 }
937
938 fn parse_numeric_response(line: &str) -> Result<u64, MtopError> {
939 if let Some(err) = Self::parse_error(line) {
940 Err(MtopError::from(err))
941 } else {
942 line.parse()
943 .map_err(|_e| MtopError::runtime(format!("unable to parse '{}'", line)))
944 }
945 }
946
947 pub async fn set<V>(&mut self, key: &Key, flags: u64, ttl: u32, data: V) -> Result<(), MtopError>
949 where
950 V: AsRef<[u8]>,
951 {
952 self.send(Command::Set(key, flags, ttl, data.as_ref())).await?;
953 self.read_simple_response("STORED").await
954 }
955
956 pub async fn add<V>(&mut self, key: &Key, flags: u64, ttl: u32, data: V) -> Result<(), MtopError>
958 where
959 V: AsRef<[u8]>,
960 {
961 self.send(Command::Add(key, flags, ttl, data.as_ref())).await?;
962 self.read_simple_response("STORED").await
963 }
964
965 pub async fn replace<V>(&mut self, key: &Key, flags: u64, ttl: u32, data: V) -> Result<(), MtopError>
967 where
968 V: AsRef<[u8]>,
969 {
970 self.send(Command::Replace(key, flags, ttl, data.as_ref())).await?;
971 self.read_simple_response("STORED").await
972 }
973
974 pub async fn touch(&mut self, key: &Key, ttl: u32) -> Result<(), MtopError> {
976 self.send(Command::Touch(key, ttl)).await?;
977 self.read_simple_response("TOUCHED").await
978 }
979
980 pub async fn delete(&mut self, key: &Key) -> Result<(), MtopError> {
982 self.send(Command::Delete(key)).await?;
983 self.read_simple_response("DELETED").await
984 }
985
986 async fn read_simple_response(&mut self, expected: &str) -> Result<(), MtopError> {
987 match self.read.next_line().await? {
988 Some(line) if line == expected => Ok(()),
989 Some(line) => {
990 if let Some(err) = Self::parse_error(&line) {
991 Err(MtopError::from(err))
992 } else {
993 Err(MtopError::runtime(format!("unable to parse '{}'", line)))
994 }
995 }
996 None => Err(MtopError::runtime("unexpected empty response")),
997 }
998 }
999
1000 fn parse_error(line: &str) -> Option<ProtocolError> {
1001 let mut values = line.splitn(2, ' ');
1002 let (kind, message) = match (values.next(), values.next()) {
1003 (Some("BADCLASS"), Some(msg)) => (ProtocolErrorKind::BadClass, Some(msg.to_owned())),
1004 (Some("BUSY"), Some(msg)) => (ProtocolErrorKind::Busy, Some(msg.to_owned())),
1005 (Some("CLIENT_ERROR"), Some(msg)) => (ProtocolErrorKind::Client, Some(msg.to_owned())),
1006 (Some("ERROR"), None) => (ProtocolErrorKind::Syntax, None),
1007 (Some("ERROR"), Some(msg)) => (ProtocolErrorKind::Syntax, Some(msg.to_owned())),
1008 (Some("NOT_FOUND"), None) => (ProtocolErrorKind::NotFound, None),
1009 (Some("NOT_STORED"), None) => (ProtocolErrorKind::NotStored, None),
1010 (Some("SERVER_ERROR"), Some(msg)) => (ProtocolErrorKind::Server, Some(msg.to_owned())),
1011
1012 _ => return None,
1013 };
1014
1015 Some(ProtocolError { kind, message })
1016 }
1017
1018 async fn send(&mut self, cmd: Command<'_>) -> Result<(), MtopError> {
1019 let cmd_bytes: Vec<u8> = cmd.into();
1020 self.write.write_all(&cmd_bytes).await?;
1021 Ok(self.write.flush().await?)
1022 }
1023}
1024
1025impl fmt::Debug for Memcached {
1026 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1027 f.debug_struct("Memcached")
1028 .field("read", &"...")
1029 .field("write", &"...")
1030 .finish()
1031 }
1032}
1033
1034#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1035#[repr(transparent)]
1036pub struct Key(String);
1037
1038impl Key {
1039 const MAX_LENGTH: usize = 250;
1040
1041 pub fn one<T>(val: T) -> Result<Key, MtopError>
1042 where
1043 T: Into<String>,
1044 {
1045 let val = val.into();
1046 if !Self::is_legal_val(&val) {
1047 Err(MtopError::runtime(format!("invalid key {}", val)))
1048 } else {
1049 Ok(Key(val))
1050 }
1051 }
1052
1053 pub fn many<I, T>(vals: I) -> Result<Vec<Key>, MtopError>
1054 where
1055 I: IntoIterator<Item = T>,
1056 T: Into<String>,
1057 {
1058 let iter = vals.into_iter();
1059 let (sz, _) = iter.size_hint();
1060 let mut out = Vec::with_capacity(sz);
1061
1062 for val in iter {
1063 out.push(Self::one(val)?);
1064 }
1065
1066 Ok(out)
1067 }
1068
1069 pub fn len(&self) -> usize {
1070 self.0.len()
1071 }
1072
1073 pub fn is_empty(&self) -> bool {
1074 self.0.is_empty()
1075 }
1076
1077 fn is_legal_val(val: &str) -> bool {
1078 if val.len() > Self::MAX_LENGTH {
1079 return false;
1080 }
1081
1082 for c in val.chars() {
1083 if !c.is_ascii() || c.is_ascii_whitespace() || c.is_ascii_control() {
1084 return false;
1085 }
1086 }
1087
1088 true
1089 }
1090}
1091
1092impl fmt::Display for Key {
1093 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1094 self.0.fmt(f)
1095 }
1096}
1097
1098impl AsRef<str> for Key {
1099 fn as_ref(&self) -> &str {
1100 &self.0
1101 }
1102}
1103
1104impl Borrow<str> for Key {
1105 fn borrow(&self) -> &str {
1106 &self.0
1107 }
1108}
1109
1110#[cfg(test)]
1111mod test {
1112 use super::{ErrorKind, Key, Memcached, Meta, Slab, SlabItem, SlabItems};
1113 use std::io::{Cursor, Error};
1114 use std::pin::Pin;
1115 use std::task::{Context, Poll};
1116 use std::time::Duration;
1117 use tokio::io::AsyncWrite;
1118 use tokio::sync::mpsc::{self, UnboundedSender};
1119
1120 #[test]
1125 fn test_key_one_length() {
1126 let val = "abc".repeat(Key::MAX_LENGTH);
1127 let res = Key::one(val);
1128 assert!(res.is_err());
1129 }
1130
1131 #[test]
1132 fn test_key_one_non_ascii() {
1133 let val = "🤦";
1134 let res = Key::one(val);
1135 assert!(res.is_err());
1136 }
1137
1138 #[test]
1139 fn test_key_one_whitespace() {
1140 let val = "some thing";
1141 let res = Key::one(val);
1142 assert!(res.is_err());
1143 }
1144
1145 #[test]
1146 fn test_key_one_control_char() {
1147 let val = "\x7F";
1148 let res = Key::one(val);
1149 assert!(res.is_err());
1150 }
1151
1152 #[test]
1153 fn test_key_one_success() {
1154 let val = "a-reasonable-key";
1155 let res = Key::one(val);
1156 assert!(res.is_ok());
1157 }
1158
1159 struct WriteAdapter {
1160 tx: UnboundedSender<Vec<u8>>,
1161 }
1162
1163 impl AsyncWrite for WriteAdapter {
1164 fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, Error>> {
1165 self.tx.send(buf.to_owned()).unwrap();
1166 Poll::Ready(Ok(buf.len()))
1167 }
1168
1169 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1170 Poll::Ready(Ok(()))
1171 }
1172
1173 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1174 Poll::Ready(Ok(()))
1175 }
1176 }
1177
1178 macro_rules! client {
1183 () => ({
1184 let (tx, rx) = mpsc::unbounded_channel();
1185 let reads = Vec::new();
1186 (rx, Memcached::new(Cursor::new(reads), WriteAdapter { tx }))
1187 });
1188 ($($line:expr),+ $(,)?) => ({
1189 let (tx, rx) = mpsc::unbounded_channel();
1190 let mut reads = Vec::new();
1191 $(reads.extend_from_slice($line.as_bytes());)+
1192 (rx, Memcached::new(Cursor::new(reads), WriteAdapter { tx }))
1193 })
1194 }
1195
1196 #[tokio::test]
1201 async fn test_memcached_get_no_key() {
1202 let (_rx, mut client) = client!();
1203 let vals: Vec<String> = vec![];
1204 let keys = Key::many(vals).unwrap();
1205 let res = client.get(&keys).await.unwrap();
1206
1207 assert!(res.is_empty());
1208 }
1209
1210 #[tokio::test]
1211 async fn test_memcached_get_error() {
1212 let (_rx, mut client) = client!("SERVER_ERROR backend failure\r\n");
1213 let keys = Key::many(vec!["foo", "baz"]).unwrap();
1214 let res = client.get(&keys).await;
1215
1216 assert!(res.is_err());
1217 let err = res.unwrap_err();
1218 assert_eq!(ErrorKind::Protocol, err.kind());
1219 }
1220
1221 #[tokio::test]
1222 async fn test_memcached_get_miss() {
1223 let (_rx, mut client) = client!("END\r\n");
1224 let keys = Key::many(vec!["foo", "baz"]).unwrap();
1225 let res = client.get(&keys).await.unwrap();
1226
1227 assert!(res.is_empty());
1228 }
1229
1230 #[tokio::test]
1231 async fn test_memcached_get_hit() {
1232 let (_rx, mut client) = client!(
1233 "VALUE foo 32 3 1\r\n",
1234 "bar\r\n",
1235 "VALUE baz 64 3 2\r\n",
1236 "qux\r\n",
1237 "END\r\n",
1238 );
1239 let keys = Key::many(vec!["foo", "baz"]).unwrap();
1240 let res = client.get(&keys).await.unwrap();
1241
1242 let val1 = res.get("foo").unwrap();
1243 assert_eq!("foo", val1.key);
1244 assert_eq!("bar".as_bytes(), val1.data);
1245 assert_eq!(32, val1.flags);
1246 assert_eq!(1, val1.cas);
1247
1248 let val2 = res.get("baz").unwrap();
1249 assert_eq!("baz", val2.key);
1250 assert_eq!("qux".as_bytes(), val2.data);
1251 assert_eq!(64, val2.flags);
1252 assert_eq!(2, val2.cas);
1253 }
1254
1255 #[tokio::test]
1260 async fn test_memcached_incr_bad_val() {
1261 let (mut rx, mut client) = client!("CLIENT_ERROR cannot increment or decrement non-numeric value\r\n");
1262 let key = Key::one("test").unwrap();
1263 let res = client.incr(&key, 2).await;
1264
1265 assert!(res.is_err());
1266 let err = res.unwrap_err();
1267 assert_eq!(ErrorKind::Protocol, err.kind());
1268
1269 let bytes = rx.recv().await.unwrap();
1270 let command = String::from_utf8(bytes).unwrap();
1271 assert_eq!("incr test 2\r\n", command);
1272 }
1273
1274 #[tokio::test]
1275 async fn test_memcached_incr_success() {
1276 let (mut rx, mut client) = client!("3\r\n");
1277 let key = Key::one("test").unwrap();
1278 let res = client.incr(&key, 2).await.unwrap();
1279
1280 assert_eq!(3, res);
1281 let bytes = rx.recv().await.unwrap();
1282 let command = String::from_utf8(bytes).unwrap();
1283 assert_eq!("incr test 2\r\n", command);
1284 }
1285
1286 #[tokio::test]
1291 async fn test_memcached_decr_bad_val() {
1292 let (mut rx, mut client) = client!("CLIENT_ERROR cannot increment or decrement non-numeric value\r\n");
1293 let key = Key::one("test").unwrap();
1294 let res = client.decr(&key, 1).await;
1295
1296 assert!(res.is_err());
1297 let err = res.unwrap_err();
1298 assert_eq!(ErrorKind::Protocol, err.kind());
1299
1300 let bytes = rx.recv().await.unwrap();
1301 let command = String::from_utf8(bytes).unwrap();
1302 assert_eq!("decr test 1\r\n", command);
1303 }
1304
1305 #[tokio::test]
1306 async fn test_memcached_decr_success() {
1307 let (mut rx, mut client) = client!("3\r\n");
1308 let key = Key::one("test").unwrap();
1309 let res = client.decr(&key, 1).await.unwrap();
1310
1311 assert_eq!(3, res);
1312 let bytes = rx.recv().await.unwrap();
1313 let command = String::from_utf8(bytes).unwrap();
1314 assert_eq!("decr test 1\r\n", command);
1315 }
1316
1317 #[tokio::test]
1322 async fn test_memcached_ping_bad_val() {
1323 let (mut rx, mut client) = client!("220 localhost ESMTP Postfix\r\n");
1324 let res = client.ping().await;
1325
1326 assert!(res.is_err());
1327 let err = res.unwrap_err();
1328 assert_eq!(ErrorKind::Runtime, err.kind());
1329
1330 let bytes = rx.recv().await.unwrap();
1331 let command = String::from_utf8(bytes).unwrap();
1332 assert_eq!("version\r\n", command);
1333 }
1334
1335 #[tokio::test]
1336 async fn test_memcached_ping_success() {
1337 let (mut rx, mut client) = client!("VERSION 1.6.22\r\n");
1338 client.ping().await.unwrap();
1339
1340 let bytes = rx.recv().await.unwrap();
1341 let command = String::from_utf8(bytes).unwrap();
1342 assert_eq!("version\r\n", command);
1343 }
1344
1345 macro_rules! test_store_command_success {
1346 ($method:ident, $verb:expr) => {
1347 let (mut rx, mut client) = client!("STORED\r\n");
1348 let res = client.$method(&Key::one("test").unwrap(), 0, 300, "val".as_bytes()).await;
1349
1350 assert!(res.is_ok());
1351 let bytes = rx.recv().await.unwrap();
1352 let command = String::from_utf8(bytes).unwrap();
1353 assert_eq!(concat!($verb, " test 0 300 3\r\nval\r\n"), command);
1354 };
1355 }
1356
1357 macro_rules! test_store_command_error {
1358 ($method:ident, $verb:expr) => {
1359 let (mut rx, mut client) = client!("NOT_STORED\r\n");
1360 let res = client.$method(&Key::one("test").unwrap(), 0, 300, "val".as_bytes()).await;
1361
1362 assert!(res.is_err());
1363 let err = res.unwrap_err();
1364 assert_eq!(ErrorKind::Protocol, err.kind());
1365
1366 let bytes = rx.recv().await.unwrap();
1367 let command = String::from_utf8(bytes).unwrap();
1368 assert_eq!(concat!($verb, " test 0 300 3\r\nval\r\n"), command);
1369 };
1370 }
1371
1372 #[tokio::test]
1377 async fn test_memcached_set_success() {
1378 test_store_command_success!(set, "set");
1379 }
1380
1381 #[tokio::test]
1382 async fn test_memcached_set_error() {
1383 test_store_command_error!(set, "set");
1384 }
1385
1386 #[tokio::test]
1391 async fn test_memcached_add_success() {
1392 test_store_command_success!(add, "add");
1393 }
1394
1395 #[tokio::test]
1396 async fn test_memcached_add_error() {
1397 test_store_command_error!(add, "add");
1398 }
1399
1400 #[tokio::test]
1405 async fn test_memcached_replace_success() {
1406 test_store_command_success!(replace, "replace");
1407 }
1408
1409 #[tokio::test]
1410 async fn test_memcached_replace_error() {
1411 test_store_command_error!(replace, "replace");
1412 }
1413
1414 #[tokio::test]
1419 async fn test_memcached_touch_success() {
1420 let (mut rx, mut client) = client!("TOUCHED\r\n");
1421 let key = Key::one("test").unwrap();
1422 let res = client.touch(&key, 300).await;
1423
1424 assert!(res.is_ok());
1425 let bytes = rx.recv().await.unwrap();
1426 let command = String::from_utf8(bytes).unwrap();
1427 assert_eq!("touch test 300\r\n", command);
1428 }
1429
1430 #[tokio::test]
1431 async fn test_memcached_touch_error() {
1432 let (mut rx, mut client) = client!("NOT_FOUND\r\n");
1433 let key = Key::one("test").unwrap();
1434 let res = client.touch(&key, 300).await;
1435
1436 assert!(res.is_err());
1437 let err = res.unwrap_err();
1438 assert_eq!(ErrorKind::Protocol, err.kind());
1439
1440 let bytes = rx.recv().await.unwrap();
1441 let command = String::from_utf8(bytes).unwrap();
1442 assert_eq!("touch test 300\r\n", command);
1443 }
1444
1445 #[tokio::test]
1450 async fn test_memcached_delete_success() {
1451 let (mut rx, mut client) = client!("DELETED\r\n");
1452 let key = Key::one("test").unwrap();
1453 let res = client.delete(&key).await;
1454
1455 assert!(res.is_ok());
1456 let bytes = rx.recv().await.unwrap();
1457 let command = String::from_utf8(bytes).unwrap();
1458 assert_eq!("delete test\r\n", command);
1459 }
1460
1461 #[tokio::test]
1462 async fn test_memcached_delete_error() {
1463 let (mut rx, mut client) = client!("NOT_FOUND\r\n");
1464 let key = Key::one("test").unwrap();
1465 let res = client.delete(&key).await;
1466
1467 assert!(res.is_err());
1468 let err = res.unwrap_err();
1469 assert_eq!(ErrorKind::Protocol, err.kind());
1470
1471 let bytes = rx.recv().await.unwrap();
1472 let command = String::from_utf8(bytes).unwrap();
1473 assert_eq!("delete test\r\n", command);
1474 }
1475
1476 #[tokio::test]
1481 async fn test_memcached_flush_all_with_wait_success() {
1482 let (mut rx, mut client) = client!("OK\r\n");
1483 let wait = Some(Duration::from_secs(25));
1484 let res = client.flush_all(wait).await;
1485
1486 assert!(res.is_ok());
1487 let bytes = rx.recv().await.unwrap();
1488 let command = String::from_utf8(bytes).unwrap();
1489 assert_eq!("flush_all 25\r\n", command);
1490 }
1491
1492 #[tokio::test]
1493 async fn test_memcached_flush_all_with_wait_error() {
1494 let (mut rx, mut client) = client!("ERROR\r\n");
1495 let wait = Some(Duration::from_secs(25));
1496 let res = client.flush_all(wait).await;
1497
1498 assert!(res.is_err());
1499 let err = res.unwrap_err();
1500 assert_eq!(ErrorKind::Protocol, err.kind());
1501
1502 let bytes = rx.recv().await.unwrap();
1503 let command = String::from_utf8(bytes).unwrap();
1504 assert_eq!("flush_all 25\r\n", command);
1505 }
1506
1507 #[tokio::test]
1508 async fn test_memcached_flush_all_no_wait_success() {
1509 let (mut rx, mut client) = client!("OK\r\n");
1510 let res = client.flush_all(None).await;
1511
1512 assert!(res.is_ok());
1513 let bytes = rx.recv().await.unwrap();
1514 let command = String::from_utf8(bytes).unwrap();
1515 assert_eq!("flush_all\r\n", command);
1516 }
1517
1518 #[tokio::test]
1519 async fn test_memcached_flush_all_no_wait_error() {
1520 let (mut rx, mut client) = client!("ERROR\r\n");
1521 let res = client.flush_all(None).await;
1522
1523 assert!(res.is_err());
1524 let err = res.unwrap_err();
1525 assert_eq!(ErrorKind::Protocol, err.kind());
1526
1527 let bytes = rx.recv().await.unwrap();
1528 let command = String::from_utf8(bytes).unwrap();
1529 assert_eq!("flush_all\r\n", command);
1530 }
1531
1532 #[tokio::test]
1537 async fn test_memcached_stats_empty() {
1538 let (_rx, mut client) = client!("END\r\n");
1539 let res = client.stats().await;
1540
1541 assert!(res.is_err());
1542 let err = res.unwrap_err();
1543 assert_eq!(ErrorKind::Runtime, err.kind());
1544 }
1545
1546 #[tokio::test]
1547 async fn test_memcached_stats_error() {
1548 let (_rx, mut client) = client!("SERVER_ERROR backend failure\r\n");
1549 let res = client.stats().await;
1550
1551 assert!(res.is_err());
1552 let err = res.unwrap_err();
1553 assert_eq!(ErrorKind::Protocol, err.kind());
1554 }
1555
1556 #[tokio::test]
1557 async fn test_memcached_stats_success() {
1558 let (_rx, mut client) = client!(
1559 "STAT pid 1525\r\n",
1560 "STAT uptime 271984\r\n",
1561 "STAT time 1687212809\r\n",
1562 "STAT version 1.6.14\r\n",
1563 "STAT libevent 2.1.12-stable\r\n",
1564 "STAT pointer_size 64\r\n",
1565 "STAT rusage_user 17.544323\r\n",
1566 "STAT rusage_system 11.830461\r\n",
1567 "STAT max_connections 1024\r\n",
1568 "STAT curr_connections 1\r\n",
1569 "STAT total_connections 3\r\n",
1570 "STAT rejected_connections 0\r\n",
1571 "STAT connection_structures 2\r\n",
1572 "STAT response_obj_oom 0\r\n",
1573 "STAT response_obj_count 1\r\n",
1574 "STAT response_obj_bytes 32768\r\n",
1575 "STAT read_buf_count 4\r\n",
1576 "STAT read_buf_bytes 65536\r\n",
1577 "STAT read_buf_bytes_free 16384\r\n",
1578 "STAT read_buf_oom 0\r\n",
1579 "STAT reserved_fds 20\r\n",
1580 "STAT cmd_get 1\r\n",
1581 "STAT cmd_set 0\r\n",
1582 "STAT cmd_flush 0\r\n",
1583 "STAT cmd_touch 0\r\n",
1584 "STAT cmd_meta 0\r\n",
1585 "STAT get_hits 0\r\n",
1586 "STAT get_misses 1\r\n",
1587 "STAT get_expired 0\r\n",
1588 "STAT get_flushed 0\r\n",
1589 "STAT delete_misses 0\r\n",
1590 "STAT delete_hits 0\r\n",
1591 "STAT incr_misses 0\r\n",
1592 "STAT incr_hits 0\r\n",
1593 "STAT decr_misses 0\r\n",
1594 "STAT decr_hits 0\r\n",
1595 "STAT cas_misses 0\r\n",
1596 "STAT cas_hits 0\r\n",
1597 "STAT cas_badval 0\r\n",
1598 "STAT touch_hits 0\r\n",
1599 "STAT touch_misses 0\r\n",
1600 "STAT store_too_large 0\r\n",
1601 "STAT store_no_memory 0\r\n",
1602 "STAT auth_cmds 0\r\n",
1603 "STAT auth_errors 0\r\n",
1604 "STAT bytes_read 16\r\n",
1605 "STAT bytes_written 7\r\n",
1606 "STAT limit_maxbytes 67108864\r\n",
1607 "STAT accepting_conns 1\r\n",
1608 "STAT listen_disabled_num 0\r\n",
1609 "STAT time_in_listen_disabled_us 0\r\n",
1610 "STAT threads 4\r\n",
1611 "STAT conn_yields 0\r\n",
1612 "STAT hash_power_level 16\r\n",
1613 "STAT hash_bytes 524288\r\n",
1614 "STAT hash_is_expanding 0\r\n",
1615 "STAT slab_reassign_rescues 0\r\n",
1616 "STAT slab_reassign_chunk_rescues 0\r\n",
1617 "STAT slab_reassign_evictions_nomem 0\r\n",
1618 "STAT slab_reassign_inline_reclaim 0\r\n",
1619 "STAT slab_reassign_busy_items 0\r\n",
1620 "STAT slab_reassign_busy_deletes 0\r\n",
1621 "STAT slab_reassign_running 0\r\n",
1622 "STAT slabs_moved 0\r\n",
1623 "STAT lru_crawler_running 0\r\n",
1624 "STAT lru_crawler_starts 105\r\n",
1625 "STAT lru_maintainer_juggles 271976\r\n",
1626 "STAT malloc_fails 0\r\n",
1627 "STAT log_worker_dropped 0\r\n",
1628 "STAT log_worker_written 0\r\n",
1629 "STAT log_watcher_skipped 0\r\n",
1630 "STAT log_watcher_sent 0\r\n",
1631 "STAT log_watchers 0\r\n",
1632 "STAT unexpected_napi_ids 0\r\n",
1633 "STAT round_robin_fallback 0\r\n",
1634 "STAT bytes 0\r\n",
1635 "STAT curr_items 0\r\n",
1636 "STAT total_items 0\r\n",
1637 "STAT slab_global_page_pool 0\r\n",
1638 "STAT expired_unfetched 0\r\n",
1639 "STAT evicted_unfetched 0\r\n",
1640 "STAT evicted_active 0\r\n",
1641 "STAT evictions 0\r\n",
1642 "STAT reclaimed 0\r\n",
1643 "STAT crawler_reclaimed 0\r\n",
1644 "STAT crawler_items_checked 0\r\n",
1645 "STAT lrutail_reflocked 0\r\n",
1646 "STAT moves_to_cold 0\r\n",
1647 "STAT moves_to_warm 0\r\n",
1648 "STAT moves_within_lru 0\r\n",
1649 "STAT direct_reclaims 0\r\n",
1650 "STAT lru_bumps_dropped 0\r\n",
1651 "END\r\n",
1652 );
1653 let res = client.stats().await.unwrap();
1654
1655 assert_eq!(0, res.cmd_set);
1656 assert_eq!(1, res.cmd_get);
1657 assert_eq!(1, res.get_misses);
1658 assert_eq!(0, res.get_hits);
1659 }
1660
1661 #[tokio::test]
1666 async fn test_memcached_slabs_empty() {
1667 let (_rx, mut client) = client!("STAT active_slabs 0\r\n", "STAT total_malloced 0\r\n", "END\r\n");
1668 let res = client.slabs().await.unwrap();
1669
1670 assert!(res.slabs.is_empty());
1671 }
1672
1673 #[tokio::test]
1674 async fn test_memcached_slabs_error() {
1675 let (_rx, mut client) = client!("ERROR Too many open connections\r\n");
1676 let res = client.slabs().await;
1677
1678 assert!(res.is_err());
1679 let err = res.unwrap_err();
1680 assert_eq!(ErrorKind::Protocol, err.kind());
1681 }
1682
1683 #[tokio::test]
1684 async fn test_memcached_slabs_success() {
1685 let (_rx, mut client) = client!(
1686 "STAT 6:chunk_size 304\r\n",
1687 "STAT 6:chunks_per_page 3449\r\n",
1688 "STAT 6:total_pages 1\r\n",
1689 "STAT 6:total_chunks 3449\r\n",
1690 "STAT 6:used_chunks 1\r\n",
1691 "STAT 6:free_chunks 3448\r\n",
1692 "STAT 6:free_chunks_end 0\r\n",
1693 "STAT 6:get_hits 951\r\n",
1694 "STAT 6:cmd_set 100\r\n",
1695 "STAT 6:delete_hits 0\r\n",
1696 "STAT 6:incr_hits 0\r\n",
1697 "STAT 6:decr_hits 0\r\n",
1698 "STAT 6:cas_hits 0\r\n",
1699 "STAT 6:cas_badval 0\r\n",
1700 "STAT 6:touch_hits 0\r\n",
1701 "STAT 7:chunk_size 384\r\n",
1702 "STAT 7:chunks_per_page 2730\r\n",
1703 "STAT 7:total_pages 1\r\n",
1704 "STAT 7:total_chunks 2730\r\n",
1705 "STAT 7:used_chunks 5\r\n",
1706 "STAT 7:free_chunks 2725\r\n",
1707 "STAT 7:free_chunks_end 0\r\n",
1708 "STAT 7:get_hits 4792\r\n",
1709 "STAT 7:cmd_set 520\r\n",
1710 "STAT 7:delete_hits 0\r\n",
1711 "STAT 7:incr_hits 0\r\n",
1712 "STAT 7:decr_hits 0\r\n",
1713 "STAT 7:cas_hits 0\r\n",
1714 "STAT 7:cas_badval 0\r\n",
1715 "STAT 7:touch_hits 0\r\n",
1716 "STAT active_slabs 2\r\n",
1717 "STAT total_malloced 30408704\r\n",
1718 "END\r\n",
1719 );
1720 let res = client.slabs().await.unwrap();
1721
1722 let expected = vec![
1723 Slab {
1724 id: 6,
1725 chunk_size: 304,
1726 chunks_per_page: 3449,
1727 total_pages: 1,
1728 total_chunks: 3449,
1729 used_chunks: 1,
1730 free_chunks: 3448,
1731 get_hits: 951,
1732 cmd_set: 100,
1733 delete_hits: 0,
1734 incr_hits: 0,
1735 decr_hits: 0,
1736 cas_hits: 0,
1737 cas_badval: 0,
1738 touch_hits: 0,
1739 },
1740 Slab {
1741 id: 7,
1742 chunk_size: 384,
1743 chunks_per_page: 2730,
1744 total_pages: 1,
1745 total_chunks: 2730,
1746 used_chunks: 5,
1747 free_chunks: 2725,
1748 get_hits: 4792,
1749 cmd_set: 520,
1750 delete_hits: 0,
1751 incr_hits: 0,
1752 decr_hits: 0,
1753 cas_hits: 0,
1754 cas_badval: 0,
1755 touch_hits: 0,
1756 },
1757 ];
1758
1759 assert_eq!(expected, res.slabs);
1760 }
1761
1762 #[tokio::test]
1767 async fn test_memcached_items_empty() {
1768 let (_rx, mut client) = client!();
1769 let res = client.items().await.unwrap();
1770
1771 assert!(res.is_empty());
1772 }
1773
1774 #[tokio::test]
1775 async fn test_memcached_items_error() {
1776 let (_rx, mut client) = client!("ERROR Too many open connections\r\n");
1777 let res = client.items().await;
1778
1779 assert!(res.is_err());
1780 let err = res.unwrap_err();
1781 assert_eq!(ErrorKind::Protocol, err.kind());
1782 }
1783
1784 #[tokio::test]
1785 async fn test_memcached_items_success() {
1786 let (_rx, mut client) = client!(
1787 "STAT items:39:number 3\r\n",
1788 "STAT items:39:number_hot 0\r\n",
1789 "STAT items:39:number_warm 1\r\n",
1790 "STAT items:39:number_cold 2\r\n",
1791 "STAT items:39:age_hot 0\r\n",
1792 "STAT items:39:age_warm 7\r\n",
1793 "STAT items:39:age 8\r\n",
1794 "STAT items:39:mem_requested 1535788\r\n",
1795 "STAT items:39:evicted 1646\r\n",
1796 "STAT items:39:evicted_nonzero 1646\r\n",
1797 "STAT items:39:evicted_time 0\r\n",
1798 "STAT items:39:outofmemory 9\r\n",
1799 "STAT items:39:tailrepairs 0\r\n",
1800 "STAT items:39:reclaimed 13\r\n",
1801 "STAT items:39:expired_unfetched 4\r\n",
1802 "STAT items:39:evicted_unfetched 202\r\n",
1803 "STAT items:39:evicted_active 6\r\n",
1804 "STAT items:39:crawler_reclaimed 0\r\n",
1805 "STAT items:39:crawler_items_checked 40\r\n",
1806 "STAT items:39:lrutail_reflocked 17365\r\n",
1807 "STAT items:39:moves_to_cold 8703\r\n",
1808 "STAT items:39:moves_to_warm 7285\r\n",
1809 "STAT items:39:moves_within_lru 3651\r\n",
1810 "STAT items:39:direct_reclaims 1949\r\n",
1811 "STAT items:39:hits_to_hot 894\r\n",
1812 "STAT items:39:hits_to_warm 4079\r\n",
1813 "STAT items:39:hits_to_cold 8043\r\n",
1814 "STAT items:39:hits_to_temp 0\r\n",
1815 "END\r\n",
1816 );
1817 let res = client.items().await.unwrap();
1818
1819 let expected = SlabItems {
1820 items: vec![SlabItem {
1821 id: 39,
1822 number: 3,
1823 number_hot: 0,
1824 number_warm: 1,
1825 number_cold: 2,
1826 age_hot: 0,
1827 age_warm: 7,
1828 age: 8,
1829 mem_requested: 1535788,
1830 evicted: 1646,
1831 evicted_nonzero: 1646,
1832 evicted_time: 0,
1833 out_of_memory: 9,
1834 tail_repairs: 0,
1835 reclaimed: 13,
1836 expired_unfetched: 4,
1837 evicted_unfetched: 202,
1838 evicted_active: 6,
1839 crawler_reclaimed: 0,
1840 crawler_items_checked: 40,
1841 lrutail_reflocked: 17365,
1842 moves_to_cold: 8703,
1843 moves_to_warm: 7285,
1844 moves_within_lru: 3651,
1845 direct_reclaims: 1949,
1846 hits_to_hot: 894,
1847 hits_to_warm: 4079,
1848 hits_to_cold: 8043,
1849 hits_to_temp: 0,
1850 }],
1851 };
1852
1853 assert_eq!(expected, res);
1854 }
1855
1856 #[tokio::test]
1861 async fn test_memcached_metas_empty() {
1862 let (_rx, mut client) = client!();
1863 let res = client.metas().await.unwrap();
1864
1865 assert!(res.is_empty());
1866 }
1867
1868 #[tokio::test]
1869 async fn test_memcached_metas_error() {
1870 let (_rx, mut client) = client!("BUSY crawler is busy\r\n",);
1871 let res = client.metas().await;
1872
1873 assert!(res.is_err());
1874 let err = res.unwrap_err();
1875 assert_eq!(ErrorKind::Protocol, err.kind());
1876 }
1877
1878 #[tokio::test]
1879 async fn test_memcached_metas_success() {
1880 let (_rx, mut client) = client!(
1881 "key=memcached%2Fmurmur3_hash.c exp=1687216956 la=1687216656 cas=259502 fetch=yes cls=17 size=2912\r\n",
1882 "key=memcached%2Fmd5.h exp=1687216956 la=1687216656 cas=259731 fetch=yes cls=17 size=3593\r\n",
1883 "END\r\n",
1884 );
1885 let res = client.metas().await.unwrap();
1886
1887 let expected = vec![
1888 Meta {
1889 key: "memcached/murmur3_hash.c".to_string(),
1890 expires: 1687216956,
1891 size: 2912,
1892 },
1893 Meta {
1894 key: "memcached/md5.h".to_string(),
1895 expires: 1687216956,
1896 size: 3593,
1897 },
1898 ];
1899
1900 assert_eq!(expected, res);
1901 }
1902}