mcmc_rs/
lib.rs

1use std::collections::HashMap;
2
3use crc32fast;
4use smol::io::{self, BufReader};
5use smol::net::{TcpStream, UdpSocket, unix::UnixStream};
6use smol::prelude::*;
7
8pub enum StatsArg {
9    Empty,
10    Settings,
11    Items,
12    Sizes,
13    Slabs,
14    Conns,
15}
16
17pub enum SlabsAutomoveArg {
18    Zero,
19    One,
20    Two,
21}
22
23pub enum LruCrawlerArg {
24    Enable,
25    Disable,
26}
27
28pub enum LruCrawlerCrawlArg<'a> {
29    Classids(&'a [usize]),
30    All,
31}
32
33pub enum LruCrawlerMetadumpArg<'a> {
34    Classids(&'a [usize]),
35    All,
36    Hash,
37}
38
39pub enum LruCrawlerMgdumpArg<'a> {
40    Classids(&'a [usize]),
41    All,
42    Hash,
43}
44
45#[derive(Debug, PartialEq)]
46pub struct Item {
47    pub key: String,
48    pub flags: u32,
49    pub cas_unique: Option<u64>,
50    pub data_block: Vec<u8>,
51}
52
53#[derive(Debug, PartialEq)]
54pub enum PipelineResponse {
55    Bool(bool),
56    Item(Item),
57    String(String),
58    OptionString(Option<String>),
59    VecString(Vec<String>),
60    Unit(()),
61    Value(Option<u64>),
62    HashMap(HashMap<String, String>),
63}
64
65async fn parse_storage_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
66    s: &mut S,
67    noreply: bool,
68) -> io::Result<bool> {
69    if noreply {
70        return Ok(true);
71    };
72    let mut line = String::new();
73    s.read_line(&mut line).await?;
74    match line.as_str() {
75        "STORED\r\n" => Ok(true),
76        "NOT_STORED\r\n" | "EXISTS\r\n" | "NOT_FOUND\r\n" => Ok(false),
77        _ => Err(io::Error::other(line)),
78    }
79}
80
81async fn parse_retrieval_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
82    s: &mut S,
83) -> io::Result<Vec<Item>> {
84    let mut items = Vec::new();
85    let mut line = String::new();
86    s.read_line(&mut line).await?;
87    while line.starts_with("VALUE") {
88        let mut split = line.split(' ');
89        split.next();
90        let (key, flags, bytes, cas_unique) = (
91            split.next().unwrap().to_string(),
92            split.next().unwrap().parse::<u32>().unwrap(),
93            split.next().unwrap().trim_end().parse::<usize>().unwrap(),
94            split.next().map(|x| x.trim_end().parse::<u64>().unwrap()),
95        );
96        let mut data_block = vec![0; bytes];
97        s.read_exact(&mut data_block).await?;
98        s.read_line(&mut String::new()).await?;
99        items.push(Item {
100            key,
101            flags,
102            cas_unique,
103            data_block,
104        });
105        line.clear();
106        s.read_line(&mut line).await?;
107    }
108    if line == "END\r\n" {
109        Ok(items)
110    } else {
111        Err(io::Error::other(line))
112    }
113}
114
115async fn parse_version_rp<S: AsyncBufRead + AsyncWrite + Unpin>(s: &mut S) -> io::Result<String> {
116    let mut line = String::new();
117    let n = s.read_line(&mut line).await?;
118    if line.starts_with("VERSION") {
119        Ok(line[8..n - 2].to_string())
120    } else {
121        Err(io::Error::other(line))
122    }
123}
124
125async fn parse_ok_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
126    s: &mut S,
127    noreply: bool,
128) -> io::Result<()> {
129    if noreply {
130        return Ok(());
131    };
132    let mut line = String::new();
133    s.read_line(&mut line).await?;
134    if line == "OK\r\n" {
135        Ok(())
136    } else {
137        Err(io::Error::other(line))
138    }
139}
140
141async fn parse_delete_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
142    s: &mut S,
143    noreply: bool,
144) -> io::Result<bool> {
145    if noreply {
146        return Ok(true);
147    };
148    let mut line = String::new();
149    s.read_line(&mut line).await?;
150    match line.as_str() {
151        "DELETED\r\n" => Ok(true),
152        "NOT_FOUND\r\n" => Ok(false),
153        _ => Err(io::Error::other(line)),
154    }
155}
156
157async fn parse_auth_rp<S: AsyncBufRead + AsyncWrite + Unpin>(s: &mut S) -> io::Result<()> {
158    let mut line = String::new();
159    s.read_line(&mut line).await?;
160    match line.as_str() {
161        "STORED\r\n" => Ok(()),
162        _ => Err(io::Error::other(line)),
163    }
164}
165
166async fn parse_incr_decr_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
167    s: &mut S,
168    noreply: bool,
169) -> io::Result<Option<u64>> {
170    if noreply {
171        return Ok(None);
172    };
173    let mut line = String::new();
174    s.read_line(&mut line).await?;
175    if line == "NOT_FOUND\r\n" {
176        return Ok(None);
177    };
178    match line.trim_end().parse::<u64>() {
179        Ok(v) => Ok(Some(v)),
180        Err(_) => Err(io::Error::other(line)),
181    }
182}
183
184async fn parse_touch_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
185    s: &mut S,
186    noreply: bool,
187) -> io::Result<bool> {
188    if noreply {
189        return Ok(true);
190    };
191    let mut line = String::new();
192    s.read_line(&mut line).await?;
193    if line == "TOUCHED\r\n" {
194        Ok(true)
195    } else if line == "NOT_FOUND\r\n" {
196        Ok(false)
197    } else {
198        Err(io::Error::other(line))
199    }
200}
201
202async fn parse_stats_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
203    s: &mut S,
204) -> io::Result<HashMap<String, String>> {
205    let mut lines = s.lines();
206    let mut items = HashMap::new();
207    while let Some(line) = lines.next().await {
208        let data = line?;
209        if data.starts_with("STAT") {
210            let mut split = data.split(' ');
211            split.next();
212            let (k, v) = (
213                split.next().unwrap().to_string(),
214                split.next().unwrap().trim_end().to_string(),
215            );
216            items.insert(k, v);
217        } else if data == "END" {
218            break;
219        } else {
220            return Err(io::Error::other(data));
221        };
222    }
223    Ok(items)
224}
225
226async fn parse_lru_crawler_metadump_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
227    s: &mut S,
228) -> io::Result<Vec<String>> {
229    let mut line = String::new();
230    s.read_line(&mut line).await?;
231    let mut items = Vec::new();
232    while line.starts_with("key=") {
233        items.push(line.trim_end().to_string());
234        line.clear();
235        s.read_line(&mut line).await?;
236    }
237    if line == "END\r\n" {
238        Ok(items)
239    } else {
240        Err(io::Error::other(line))
241    }
242}
243
244async fn parse_lru_crawler_mgdump_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
245    s: &mut S,
246) -> io::Result<Vec<String>> {
247    let mut line = String::new();
248    s.read_line(&mut line).await?;
249    let mut items = Vec::new();
250    while line.starts_with("mg ") {
251        let mut split = line.split(' ');
252        split.next();
253        items.push(split.next().unwrap().trim_end().to_string());
254        line.clear();
255        s.read_line(&mut line).await?;
256    }
257    if line == "EN\r\n" {
258        Ok(items)
259    } else {
260        Err(io::Error::other(line))
261    }
262}
263
264async fn parse_mn_rp<S: AsyncBufRead + AsyncWrite + Unpin>(s: &mut S) -> io::Result<()> {
265    let mut line = String::new();
266    s.read_line(&mut line).await?;
267    if line == "MN\r\n" {
268        Ok(())
269    } else {
270        Err(io::Error::other(line))
271    }
272}
273
274async fn parse_me_r<S: AsyncBufRead + AsyncWrite + Unpin>(s: &mut S) -> io::Result<Option<String>> {
275    let mut line = String::new();
276    s.read_line(&mut line).await?;
277    if line == "EN\r\n" {
278        Ok(None)
279    } else if line.starts_with("ME") {
280        Ok(Some(line[3..line.len() - 2].to_string()))
281    } else {
282        Err(io::Error::other(line))
283    }
284}
285
286fn build_storage_cmd(
287    command_name: &[u8],
288    key: &[u8],
289    flags: u32,
290    exptime: i64,
291    cas_unique: Option<u64>,
292    noreply: bool,
293    data_block: &[u8],
294) -> Vec<u8> {
295    let n: &[u8] = if noreply { b" noreply" } else { b"" };
296    let cas = match cas_unique {
297        Some(x) => format!(" {x}"),
298        None => "".to_string(),
299    };
300    [
301        command_name,
302        b" ",
303        key,
304        b" ",
305        flags.to_string().as_bytes(),
306        b" ",
307        exptime.to_string().as_bytes(),
308        b" ",
309        data_block.len().to_string().as_bytes(),
310        cas.as_bytes(),
311        n,
312        b"\r\n",
313        data_block,
314        b"\r\n",
315    ]
316    .concat()
317}
318
319fn build_retrieval_cmd(command_name: &[u8], exptime: Option<i64>, keys: &[&[u8]]) -> Vec<u8> {
320    let t = match exptime {
321        Some(x) => format!("{x} "),
322        None => "".to_string(),
323    };
324    [
325        command_name,
326        b" ",
327        t.as_bytes(),
328        keys.join(b" ".as_slice()).as_slice(),
329        b"\r\n",
330    ]
331    .concat()
332}
333
334fn build_version_cmd() -> &'static [u8] {
335    b"version\r\n"
336}
337
338fn build_quit_cmd() -> &'static [u8] {
339    b"quit\r\n"
340}
341
342fn build_shutdown_cmd(graceful: bool) -> &'static [u8] {
343    if graceful {
344        b"shutdown graceful\r\n"
345    } else {
346        b"shutdown\r\n"
347    }
348}
349
350fn build_cache_memlimit_cmd(limit: usize, noreply: bool) -> Vec<u8> {
351    let n: &[u8] = if noreply { b" noreply" } else { b"" };
352    [b"cache_memlimit ", limit.to_string().as_bytes(), n, b"\r\n"].concat()
353}
354
355fn build_flush_all_cmd(exptime: Option<i64>, noreply: bool) -> Vec<u8> {
356    let d = match exptime {
357        Some(x) => format!(" {x}"),
358        None => "".to_string(),
359    };
360    let n: &[u8] = if noreply { b" noreply" } else { b"" };
361    [b"flush_all", d.as_bytes(), n, b"\r\n"].concat()
362}
363
364fn build_delete_cmd(key: &[u8], noreply: bool) -> Vec<u8> {
365    let n: &[u8] = if noreply { b" noreply" } else { b"" };
366    [b"delete ", key, n, b"\r\n"].concat()
367}
368
369fn build_auth_cmd(username: &[u8], password: &[u8]) -> Vec<u8> {
370    [
371        b"set _ _ _ ",
372        (username.len() + password.len() + 1).to_string().as_bytes(),
373        b"\r\n",
374        username,
375        b" ",
376        password,
377        b"\r\n",
378    ]
379    .concat()
380}
381
382fn build_incr_decr_cmd(command_name: &[u8], key: &[u8], value: u64, noreply: bool) -> Vec<u8> {
383    let n: &[u8] = if noreply { b" noreply" } else { b"" };
384    [
385        command_name,
386        b" ",
387        key,
388        b" ",
389        value.to_string().as_bytes(),
390        n,
391        b"\r\n",
392    ]
393    .concat()
394}
395
396fn build_touch_cmd(key: &[u8], exptime: i64, noreply: bool) -> Vec<u8> {
397    let n: &[u8] = if noreply { b" noreply" } else { b"" };
398    [
399        b"touch ",
400        key,
401        b" ",
402        exptime.to_string().as_bytes(),
403        n,
404        b"\r\n",
405    ]
406    .concat()
407}
408
409fn build_stats_cmd(arg: StatsArg) -> Vec<u8> {
410    let a: &[u8] = match arg {
411        StatsArg::Empty => b"",
412        StatsArg::Settings => b" settings",
413        StatsArg::Items => b" items",
414        StatsArg::Sizes => b" sizes",
415        StatsArg::Slabs => b" slabs",
416        StatsArg::Conns => b" conns",
417    };
418    [b"stats", a, b"\r\n"].concat()
419}
420
421fn build_slabs_automove_cmd(arg: SlabsAutomoveArg) -> Vec<u8> {
422    let a: &[u8] = match arg {
423        SlabsAutomoveArg::Zero => b"0",
424        SlabsAutomoveArg::One => b"1",
425        SlabsAutomoveArg::Two => b"2",
426    };
427    [b"slabs automove ", a, b"\r\n"].concat()
428}
429
430fn build_lru_crawler_cmd(arg: LruCrawlerArg) -> &'static [u8] {
431    match arg {
432        LruCrawlerArg::Enable => b"lru_crawler enable\r\n",
433        LruCrawlerArg::Disable => b"lru_crawler disable\r\n",
434    }
435}
436
437fn build_lru_clawler_sleep_cmd(microseconds: usize) -> Vec<u8> {
438    [
439        b"lru_crawler sleep ",
440        microseconds.to_string().as_bytes(),
441        b"\r\n",
442    ]
443    .concat()
444}
445
446fn build_lru_crawler_tocrawl_cmd(arg: u32) -> Vec<u8> {
447    [b"lru_crawler tocrawl ", arg.to_string().as_bytes(), b"\r\n"].concat()
448}
449
450fn build_lru_clawler_crawl_cmd(arg: LruCrawlerCrawlArg) -> Vec<u8> {
451    let a = match arg {
452        LruCrawlerCrawlArg::Classids(ids) => ids
453            .iter()
454            .map(|x| x.to_string())
455            .collect::<Vec<_>>()
456            .join(","),
457        LruCrawlerCrawlArg::All => "all".to_string(),
458    };
459    [b"lru_crawler crawl ", a.as_bytes(), b"\r\n"].concat()
460}
461
462fn build_slabs_reassign_cmd(source_class: usize, dest_class: usize) -> Vec<u8> {
463    [
464        b"slabs reassign ",
465        source_class.to_string().as_bytes(),
466        b" ",
467        dest_class.to_string().as_bytes(),
468        b"\r\n",
469    ]
470    .concat()
471}
472
473fn build_lru_clawler_metadump_cmd(arg: LruCrawlerMetadumpArg) -> Vec<u8> {
474    let a = match arg {
475        LruCrawlerMetadumpArg::Classids(ids) => ids
476            .iter()
477            .map(|x| x.to_string())
478            .collect::<Vec<_>>()
479            .join(","),
480        LruCrawlerMetadumpArg::All => "all".to_string(),
481        LruCrawlerMetadumpArg::Hash => "hash".to_string(),
482    };
483    [b"lru_crawler metadump ", a.as_bytes(), b"\r\n"].concat()
484}
485
486fn build_lru_clawler_mgdump_cmd(arg: LruCrawlerMgdumpArg) -> Vec<u8> {
487    let a = match arg {
488        LruCrawlerMgdumpArg::Classids(ids) => ids
489            .iter()
490            .map(|x| x.to_string())
491            .collect::<Vec<_>>()
492            .join(","),
493        LruCrawlerMgdumpArg::All => "all".to_string(),
494        LruCrawlerMgdumpArg::Hash => "hash".to_string(),
495    };
496    [b"lru_crawler mgdump ", a.as_bytes(), b"\r\n"].concat()
497}
498
499fn build_mn_cmd() -> &'static [u8] {
500    b"mn\r\n"
501}
502
503fn build_me_cmd(key: &[u8]) -> Vec<u8> {
504    [b"me ", key, b"\r\n"].concat()
505}
506
507async fn version_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(s: &mut S) -> io::Result<String> {
508    s.write_all(build_version_cmd()).await?;
509    s.flush().await?;
510    parse_version_rp(s).await
511}
512
513async fn quit_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(s: &mut S) -> io::Result<()> {
514    s.write_all(build_quit_cmd()).await?;
515    s.flush().await?;
516    Ok(())
517}
518
519async fn shutdown_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
520    s: &mut S,
521    graceful: bool,
522) -> io::Result<()> {
523    s.write_all(build_shutdown_cmd(graceful)).await?;
524    s.flush().await?;
525    Ok(())
526}
527
528async fn cache_memlimit_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
529    s: &mut S,
530    limit: usize,
531    noreply: bool,
532) -> io::Result<()> {
533    s.write_all(&build_cache_memlimit_cmd(limit, noreply))
534        .await?;
535    s.flush().await?;
536    parse_ok_rp(s, noreply).await
537}
538
539async fn flush_all_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
540    s: &mut S,
541    exptime: Option<i64>,
542    noreply: bool,
543) -> io::Result<()> {
544    s.write_all(&build_flush_all_cmd(exptime, noreply)).await?;
545    s.flush().await?;
546    parse_ok_rp(s, noreply).await
547}
548
549async fn storage_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
550    s: &mut S,
551    command_name: &[u8],
552    key: &[u8],
553    flags: u32,
554    exptime: i64,
555    cas_unique: Option<u64>,
556    noreply: bool,
557    data_block: &[u8],
558) -> io::Result<bool> {
559    s.write_all(&build_storage_cmd(
560        command_name,
561        key,
562        flags,
563        exptime,
564        cas_unique,
565        noreply,
566        data_block,
567    ))
568    .await?;
569    s.flush().await?;
570    parse_storage_rp(s, noreply).await
571}
572
573async fn delete_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
574    s: &mut S,
575    key: &[u8],
576    noreply: bool,
577) -> io::Result<bool> {
578    s.write_all(&build_delete_cmd(key, noreply)).await?;
579    s.flush().await?;
580    parse_delete_rp(s, noreply).await
581}
582
583async fn auth_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
584    s: &mut S,
585    username: &[u8],
586    password: &[u8],
587) -> io::Result<()> {
588    s.write_all(&build_auth_cmd(username, password)).await?;
589    s.flush().await?;
590    parse_auth_rp(s).await
591}
592
593async fn incr_decr_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
594    s: &mut S,
595    command_name: &[u8],
596    key: &[u8],
597    value: u64,
598    noreply: bool,
599) -> io::Result<Option<u64>> {
600    s.write_all(&build_incr_decr_cmd(command_name, key, value, noreply))
601        .await?;
602    s.flush().await?;
603    parse_incr_decr_rp(s, noreply).await
604}
605
606async fn touch_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
607    s: &mut S,
608    key: &[u8],
609    exptime: i64,
610    noreply: bool,
611) -> io::Result<bool> {
612    s.write_all(&build_touch_cmd(key, exptime, noreply)).await?;
613    s.flush().await?;
614    parse_touch_rp(s, noreply).await
615}
616
617async fn retrieval_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
618    s: &mut S,
619    command_name: &[u8],
620    exptime: Option<i64>,
621    keys: &[&[u8]],
622) -> io::Result<Vec<Item>> {
623    s.write_all(&build_retrieval_cmd(command_name, exptime, keys))
624        .await?;
625    s.flush().await?;
626    parse_retrieval_rp(s).await
627}
628
629async fn stats_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
630    s: &mut S,
631    arg: StatsArg,
632) -> io::Result<HashMap<String, String>> {
633    s.write_all(&build_stats_cmd(arg)).await?;
634    s.flush().await?;
635    parse_stats_rp(s).await
636}
637
638async fn slabs_automove_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
639    s: &mut S,
640    arg: SlabsAutomoveArg,
641) -> io::Result<()> {
642    s.write_all(&build_slabs_automove_cmd(arg)).await?;
643    s.flush().await?;
644    parse_ok_rp(s, false).await
645}
646
647async fn lru_crawler_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
648    s: &mut S,
649    arg: LruCrawlerArg,
650) -> io::Result<()> {
651    s.write_all(build_lru_crawler_cmd(arg)).await?;
652    s.flush().await?;
653    parse_ok_rp(s, false).await
654}
655
656async fn lru_crawler_sleep_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
657    s: &mut S,
658    microseconds: usize,
659) -> io::Result<()> {
660    s.write_all(&build_lru_clawler_sleep_cmd(microseconds))
661        .await?;
662    s.flush().await?;
663    parse_ok_rp(s, false).await
664}
665
666async fn lru_crawler_tocrawl_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
667    s: &mut S,
668    arg: u32,
669) -> io::Result<()> {
670    s.write_all(&build_lru_crawler_tocrawl_cmd(arg)).await?;
671    s.flush().await?;
672    parse_ok_rp(s, false).await
673}
674
675async fn lru_crawler_crawl_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
676    s: &mut S,
677    arg: LruCrawlerCrawlArg<'_>,
678) -> io::Result<()> {
679    s.write_all(&build_lru_clawler_crawl_cmd(arg)).await?;
680    s.flush().await?;
681    parse_ok_rp(s, false).await
682}
683
684async fn slabs_reassign_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
685    s: &mut S,
686    source_class: usize,
687    dest_class: usize,
688) -> io::Result<()> {
689    s.write_all(&build_slabs_reassign_cmd(source_class, dest_class))
690        .await?;
691    s.flush().await?;
692    parse_ok_rp(s, false).await
693}
694
695async fn lru_crawler_metadump_cmd<S>(
696    s: &mut S,
697    arg: LruCrawlerMetadumpArg<'_>,
698) -> io::Result<Vec<String>>
699where
700    S: AsyncBufRead + AsyncWrite + Unpin,
701{
702    s.write_all(&build_lru_clawler_metadump_cmd(arg)).await?;
703    s.flush().await?;
704    parse_lru_crawler_metadump_rp(s).await
705}
706
707async fn lru_crawler_mgdump_cmd<S>(
708    s: &mut S,
709    arg: LruCrawlerMgdumpArg<'_>,
710) -> io::Result<Vec<String>>
711where
712    S: AsyncBufRead + AsyncWrite + Unpin,
713{
714    s.write_all(&build_lru_clawler_mgdump_cmd(arg)).await?;
715    s.flush().await?;
716    parse_lru_crawler_mgdump_rp(s).await
717}
718
719async fn mn_cmd<S>(s: &mut S) -> io::Result<()>
720where
721    S: AsyncBufRead + AsyncWrite + Unpin,
722{
723    s.write_all(build_mn_cmd()).await?;
724    s.flush().await?;
725    parse_mn_rp(s).await
726}
727
728async fn me_cmd<S>(s: &mut S, key: &[u8]) -> io::Result<Option<String>>
729where
730    S: AsyncBufRead + AsyncWrite + Unpin,
731{
732    s.write_all(&build_me_cmd(key)).await?;
733    s.flush().await?;
734    parse_me_r(s).await
735}
736
737async fn execute_cmd<S>(s: &mut S, cmds: &[Vec<u8>]) -> io::Result<Vec<PipelineResponse>>
738where
739    S: AsyncBufRead + AsyncWrite + Unpin,
740{
741    s.write_all(&cmds.concat()).await?;
742    s.flush().await?;
743    let mut result = Vec::new();
744    for cmd in cmds {
745        if cmd.starts_with(b"gets ")
746            || cmd.starts_with(b"get ")
747            || cmd.starts_with(b"gats ")
748            || cmd.starts_with(b"gat ")
749        {
750            parse_retrieval_rp(s)
751                .await?
752                .into_iter()
753                .for_each(|x| result.push(PipelineResponse::Item(x)))
754        } else if cmd.starts_with(b"set _ _ _ ") {
755            result.push(PipelineResponse::Unit(parse_auth_rp(s).await?));
756        } else if cmd.starts_with(b"set ")
757            || cmd.starts_with(b"add ")
758            || cmd.starts_with(b"replace ")
759            || cmd.starts_with(b"append ")
760            || cmd.starts_with(b"prepend ")
761            || cmd.starts_with(b"cas ")
762        {
763            let mut split = cmd.split(|x| x == &b'\r');
764            let n = split.next().unwrap();
765            result.push(PipelineResponse::Bool(
766                parse_storage_rp(s, n.ends_with(b"noreply")).await?,
767            ))
768        } else if cmd == build_version_cmd() {
769            result.push(PipelineResponse::String(parse_version_rp(s).await?));
770        } else if cmd.starts_with(b"delete ") {
771            result.push(PipelineResponse::Bool(
772                parse_delete_rp(s, cmd.ends_with(b"noreply\r\n")).await?,
773            ))
774        } else if cmd.starts_with(b"incr ") || cmd.starts_with(b"decr ") {
775            result.push(PipelineResponse::Value(
776                parse_incr_decr_rp(s, cmd.ends_with(b"noreply\r\n")).await?,
777            ))
778        } else if cmd.starts_with(b"touch ") {
779            result.push(PipelineResponse::Bool(
780                parse_touch_rp(s, cmd.ends_with(b"noreply\r\n")).await?,
781            ))
782        } else if cmd == build_quit_cmd() || cmd.starts_with(b"shutdown") {
783            result.push(PipelineResponse::Unit(()));
784        } else if cmd.starts_with(b"flush_all") || cmd.starts_with(b"cache_memlimit ") {
785            result.push(PipelineResponse::Unit(
786                parse_ok_rp(s, cmd.ends_with(b"noreply\r\n")).await?,
787            ))
788        } else if cmd.starts_with(b"slabs automove ")
789            || cmd.starts_with(b"slabs reassign ")
790            || cmd.starts_with(b"lru_crawler sleep ")
791            || cmd.starts_with(b"lru_crawler crawl ")
792            || cmd.starts_with(b"lru_crawler tocrawl ")
793            || cmd == build_lru_crawler_cmd(LruCrawlerArg::Enable)
794            || cmd == build_lru_crawler_cmd(LruCrawlerArg::Disable)
795        {
796            result.push(PipelineResponse::Unit(parse_ok_rp(s, false).await?))
797        } else if cmd == build_mn_cmd() {
798            result.push(PipelineResponse::Unit(parse_mn_rp(s).await?))
799        } else if cmd.starts_with(b"stats") {
800            result.push(PipelineResponse::HashMap(parse_stats_rp(s).await?))
801        } else if cmd.starts_with(b"lru_crawler metadump ") {
802            result.push(PipelineResponse::VecString(
803                parse_lru_crawler_metadump_rp(s).await?,
804            ))
805        } else if cmd.starts_with(b"lru_crawler mgdump ") {
806            result.push(PipelineResponse::VecString(
807                parse_lru_crawler_mgdump_rp(s).await?,
808            ))
809        } else if cmd.starts_with(b"me ") {
810            result.push(PipelineResponse::OptionString(parse_me_r(s).await?))
811        }
812    }
813    Ok(result)
814}
815
816pub enum Connection {
817    Tcp(BufReader<TcpStream>),
818    Unix(BufReader<UnixStream>),
819    Udp(UdpSocket),
820}
821impl Connection {
822    /// # Example
823    ///
824    /// ```rust
825    /// # use mcmc_rs::Connection;
826    /// # use smol::{io, block_on};
827    /// #
828    /// # block_on(async {
829    /// let mut conn = Connection::default().await?;
830    /// #     Ok::<(), io::Error>(())
831    /// # }).unwrap()
832    /// ```
833    pub async fn default() -> io::Result<Self> {
834        Ok(Connection::Tcp(BufReader::new(
835            TcpStream::connect("127.0.0.1:11211").await?,
836        )))
837    }
838
839    /// # Example
840    ///
841    /// ```rust
842    /// # use mcmc_rs::Connection;
843    /// # use smol::{io, block_on};
844    /// #
845    /// # block_on(async {
846    /// let mut conn = Connection::tcp_connect("127.0.0.1:11211").await?;
847    /// #     Ok::<(), io::Error>(())
848    /// # }).unwrap()
849    /// ```
850    pub async fn tcp_connect(addr: &str) -> io::Result<Self> {
851        Ok(Connection::Tcp(BufReader::new(
852            TcpStream::connect(addr).await?,
853        )))
854    }
855
856    /// # Example
857    ///
858    /// ```rust
859    /// # use mcmc_rs::Connection;
860    /// # use smol::{io, block_on};
861    /// #
862    /// # block_on(async {
863    /// let mut conn = Connection::unix_connect("/tmp/memcached.sock").await?;
864    /// #     Ok::<(), io::Error>(())
865    /// # }).unwrap()
866    /// ```
867    pub async fn unix_connect(path: &str) -> io::Result<Self> {
868        Ok(Connection::Unix(BufReader::new(
869            UnixStream::connect(path).await?,
870        )))
871    }
872
873    /// # Example
874    ///
875    /// ```rust
876    /// # use mcmc_rs::Connection;
877    /// # use smol::{io, block_on};
878    /// #
879    /// # block_on(async {
880    /// #     let mut conn = Connection::default().await?;
881    /// let result = conn.version().await?;
882    /// assert!(result.chars().any(|x| x.is_numeric()));
883    /// #     Ok::<(), io::Error>(())
884    /// # }).unwrap()
885    /// ```
886    pub async fn version(&mut self) -> io::Result<String> {
887        match self {
888            Connection::Tcp(s) => version_cmd(s).await,
889            Connection::Unix(s) => version_cmd(s).await,
890            Connection::Udp(_s) => todo!(),
891        }
892    }
893
894    /// # Example
895    ///
896    /// ```rust
897    /// # use mcmc_rs::Connection;
898    /// # use smol::{io, block_on};
899    /// #
900    /// # block_on(async {
901    /// let mut conn = Connection::default().await?;
902    /// conn.quit().await?;
903    /// #     Ok::<(), io::Error>(())
904    /// # }).unwrap()
905    /// ```
906    pub async fn quit(&mut self) -> io::Result<()> {
907        match self {
908            Connection::Tcp(s) => quit_cmd(s).await,
909            Connection::Unix(s) => quit_cmd(s).await,
910            Connection::Udp(_s) => todo!(),
911        }
912    }
913
914    /// # Example
915    ///
916    /// ```rust
917    /// # use mcmc_rs::Connection;
918    /// # use smol::{io, block_on};
919    /// #
920    /// # block_on(async {
921    /// let mut conn = Connection::tcp_connect("127.0.0.1:11213").await?;
922    /// conn.shutdown(true).await?;
923    /// #     Ok::<(), io::Error>(())
924    /// # }).unwrap()
925    /// ```
926    pub async fn shutdown(&mut self, graceful: bool) -> io::Result<()> {
927        match self {
928            Connection::Tcp(s) => shutdown_cmd(s, graceful).await,
929            Connection::Unix(s) => shutdown_cmd(s, graceful).await,
930            Connection::Udp(_s) => todo!(),
931        }
932    }
933
934    /// # Example
935    ///
936    /// ```rust
937    /// # use mcmc_rs::Connection;
938    /// # use smol::{io, block_on};
939    /// #
940    /// # block_on(async {
941    /// let mut conn = Connection::default().await?;
942    /// conn.cache_memlimit(10, true).await?;
943    /// #     Ok::<(), io::Error>(())
944    /// # }).unwrap()
945    /// ```
946    pub async fn cache_memlimit(&mut self, limit: usize, noreply: bool) -> io::Result<()> {
947        match self {
948            Connection::Tcp(s) => cache_memlimit_cmd(s, limit, noreply).await,
949            Connection::Unix(s) => cache_memlimit_cmd(s, limit, noreply).await,
950            Connection::Udp(_s) => todo!(),
951        }
952    }
953
954    /// # Example
955    ///
956    /// ```rust
957    /// # use mcmc_rs::Connection;
958    /// # use smol::{io, block_on};
959    /// #
960    /// # block_on(async {
961    /// let mut conn = Connection::default().await?;
962    /// conn.flush_all(Some(999), true).await?;
963    /// #     Ok::<(), io::Error>(())
964    /// # }).unwrap()
965    /// ```
966    pub async fn flush_all(&mut self, exptime: Option<i64>, noreply: bool) -> io::Result<()> {
967        match self {
968            Connection::Tcp(s) => flush_all_cmd(s, exptime, noreply).await,
969            Connection::Unix(s) => flush_all_cmd(s, exptime, noreply).await,
970            Connection::Udp(_s) => todo!(),
971        }
972    }
973
974    /// # Example
975    ///
976    /// ```rust
977    /// # use mcmc_rs::Connection;
978    /// # use smol::{io, block_on};
979    /// #
980    /// # block_on(async {
981    /// let mut conn = Connection::default().await?;
982    /// let result = conn.set(b"key", 0, -1, true, b"value").await?;
983    /// assert!(result);
984    /// #     Ok::<(), io::Error>(())
985    /// # }).unwrap()
986    /// ```
987    pub async fn set(
988        &mut self,
989        key: impl AsRef<[u8]>,
990        flags: u32,
991        exptime: i64,
992        noreply: bool,
993        data_block: impl AsRef<[u8]>,
994    ) -> io::Result<bool> {
995        match self {
996            Connection::Tcp(s) => {
997                storage_cmd(
998                    s,
999                    b"set",
1000                    key.as_ref(),
1001                    flags,
1002                    exptime,
1003                    None,
1004                    noreply,
1005                    data_block.as_ref(),
1006                )
1007                .await
1008            }
1009            Connection::Unix(s) => {
1010                storage_cmd(
1011                    s,
1012                    b"set",
1013                    key.as_ref(),
1014                    flags,
1015                    exptime,
1016                    None,
1017                    noreply,
1018                    data_block.as_ref(),
1019                )
1020                .await
1021            }
1022            Connection::Udp(_s) => todo!(),
1023        }
1024    }
1025
1026    /// # Example
1027    ///
1028    /// ```rust
1029    /// # use mcmc_rs::Connection;
1030    /// # use smol::{io, block_on};
1031    /// #
1032    /// # block_on(async {
1033    /// #     let mut conn = Connection::default().await?;
1034    /// let result = conn.add(b"key", 0, -1, true, b"value").await?;
1035    /// assert!(result);
1036    /// #     Ok::<(), io::Error>(())
1037    /// # }).unwrap()
1038    /// ```
1039    pub async fn add(
1040        &mut self,
1041        key: impl AsRef<[u8]>,
1042        flags: u32,
1043        exptime: i64,
1044        noreply: bool,
1045        data_block: impl AsRef<[u8]>,
1046    ) -> io::Result<bool> {
1047        match self {
1048            Connection::Tcp(s) => {
1049                storage_cmd(
1050                    s,
1051                    b"add",
1052                    key.as_ref(),
1053                    flags,
1054                    exptime,
1055                    None,
1056                    noreply,
1057                    data_block.as_ref(),
1058                )
1059                .await
1060            }
1061            Connection::Unix(s) => {
1062                storage_cmd(
1063                    s,
1064                    b"add",
1065                    key.as_ref(),
1066                    flags,
1067                    exptime,
1068                    None,
1069                    noreply,
1070                    data_block.as_ref(),
1071                )
1072                .await
1073            }
1074            Connection::Udp(_s) => todo!(),
1075        }
1076    }
1077
1078    /// # Example
1079    ///
1080    /// ```rust
1081    /// # use mcmc_rs::Connection;
1082    /// # use smol::{io, block_on};
1083    /// #
1084    /// # block_on(async {
1085    /// #     let mut conn = Connection::default().await?;
1086    /// let result = conn.replace(b"key", 0, -1, true, b"value").await?;
1087    /// assert!(result);
1088    /// #     Ok::<(), io::Error>(())
1089    /// # }).unwrap()
1090    /// ```
1091    pub async fn replace(
1092        &mut self,
1093        key: impl AsRef<[u8]>,
1094        flags: u32,
1095        exptime: i64,
1096        noreply: bool,
1097        data_block: impl AsRef<[u8]>,
1098    ) -> io::Result<bool> {
1099        match self {
1100            Connection::Tcp(s) => {
1101                storage_cmd(
1102                    s,
1103                    b"replace",
1104                    key.as_ref(),
1105                    flags,
1106                    exptime,
1107                    None,
1108                    noreply,
1109                    data_block.as_ref(),
1110                )
1111                .await
1112            }
1113            Connection::Unix(s) => {
1114                storage_cmd(
1115                    s,
1116                    b"replace",
1117                    key.as_ref(),
1118                    flags,
1119                    exptime,
1120                    None,
1121                    noreply,
1122                    data_block.as_ref(),
1123                )
1124                .await
1125            }
1126            Connection::Udp(_s) => todo!(),
1127        }
1128    }
1129
1130    /// # Example
1131    ///
1132    /// ```rust
1133    /// # use mcmc_rs::Connection;
1134    /// # use smol::{io, block_on};
1135    /// #
1136    /// # block_on(async {
1137    /// #     let mut conn = Connection::default().await?;
1138    /// let result = conn.append(b"key", 0, -1, true, b"value").await?;
1139    /// assert!(result);
1140    /// #     Ok::<(), io::Error>(())
1141    /// # }).unwrap()
1142    /// ```
1143    pub async fn append(
1144        &mut self,
1145        key: impl AsRef<[u8]>,
1146        flags: u32,
1147        exptime: i64,
1148        noreply: bool,
1149        data_block: impl AsRef<[u8]>,
1150    ) -> io::Result<bool> {
1151        match self {
1152            Connection::Tcp(s) => {
1153                storage_cmd(
1154                    s,
1155                    b"append",
1156                    key.as_ref(),
1157                    flags,
1158                    exptime,
1159                    None,
1160                    noreply,
1161                    data_block.as_ref(),
1162                )
1163                .await
1164            }
1165            Connection::Unix(s) => {
1166                storage_cmd(
1167                    s,
1168                    b"append",
1169                    key.as_ref(),
1170                    flags,
1171                    exptime,
1172                    None,
1173                    noreply,
1174                    data_block.as_ref(),
1175                )
1176                .await
1177            }
1178            Connection::Udp(_s) => todo!(),
1179        }
1180    }
1181
1182    /// # Example
1183    ///
1184    /// ```rust
1185    /// # use mcmc_rs::Connection;
1186    /// # use smol::{io, block_on};
1187    /// #
1188    /// # block_on(async {
1189    /// # let mut conn = Connection::default().await?;
1190    /// let result = conn.prepend(b"key", 0, -1, true, b"value").await?;
1191    /// assert!(result);
1192    /// # Ok::<(), io::Error>(())
1193    /// # }).unwrap()
1194    /// ```
1195    pub async fn prepend(
1196        &mut self,
1197        key: impl AsRef<[u8]>,
1198        flags: u32,
1199        exptime: i64,
1200        noreply: bool,
1201        data_block: impl AsRef<[u8]>,
1202    ) -> io::Result<bool> {
1203        match self {
1204            Connection::Tcp(s) => {
1205                storage_cmd(
1206                    s,
1207                    b"prepend",
1208                    key.as_ref(),
1209                    flags,
1210                    exptime,
1211                    None,
1212                    noreply,
1213                    data_block.as_ref(),
1214                )
1215                .await
1216            }
1217            Connection::Unix(s) => {
1218                storage_cmd(
1219                    s,
1220                    b"prepend",
1221                    key.as_ref(),
1222                    flags,
1223                    exptime,
1224                    None,
1225                    noreply,
1226                    data_block.as_ref(),
1227                )
1228                .await
1229            }
1230            Connection::Udp(_s) => todo!(),
1231        }
1232    }
1233
1234    /// # Example
1235    ///
1236    /// ```rust
1237    /// # use mcmc_rs::Connection;
1238    /// # use smol::{io, block_on};
1239    /// #
1240    /// # block_on(async {
1241    /// # let mut conn = Connection::default().await?;
1242    /// let result = conn.cas(b"key", 0, -1, 0, true, b"value").await?;
1243    /// assert!(result);
1244    /// # Ok::<(), io::Error>(())
1245    /// # }).unwrap()
1246    /// ```
1247    pub async fn cas(
1248        &mut self,
1249        key: impl AsRef<[u8]>,
1250        flags: u32,
1251        exptime: i64,
1252        cas_unique: u64,
1253        noreply: bool,
1254        data_block: impl AsRef<[u8]>,
1255    ) -> io::Result<bool> {
1256        match self {
1257            Connection::Tcp(s) => {
1258                storage_cmd(
1259                    s,
1260                    b"cas",
1261                    key.as_ref(),
1262                    flags,
1263                    exptime,
1264                    Some(cas_unique),
1265                    noreply,
1266                    data_block.as_ref(),
1267                )
1268                .await
1269            }
1270            Connection::Unix(s) => {
1271                storage_cmd(
1272                    s,
1273                    b"cas",
1274                    key.as_ref(),
1275                    flags,
1276                    exptime,
1277                    Some(cas_unique),
1278                    noreply,
1279                    data_block.as_ref(),
1280                )
1281                .await
1282            }
1283            Connection::Udp(_s) => todo!(),
1284        }
1285    }
1286
1287    /// # Example
1288    ///
1289    /// ```rust
1290    /// # use mcmc_rs::Connection;
1291    /// # use smol::{io, block_on};
1292    /// #
1293    /// # block_on(async {
1294    /// #     let mut conn = Connection::tcp_connect("127.0.0.1:11212").await?;
1295    /// conn.auth(b"a", b"a").await?;
1296    /// #     Ok::<(), io::Error>(())
1297    /// # }).unwrap()
1298    /// ```
1299    pub async fn auth(
1300        &mut self,
1301        username: impl AsRef<[u8]>,
1302        password: impl AsRef<[u8]>,
1303    ) -> io::Result<()> {
1304        match self {
1305            Connection::Tcp(s) => auth_cmd(s, username.as_ref(), password.as_ref()).await,
1306            Connection::Unix(s) => auth_cmd(s, username.as_ref(), password.as_ref()).await,
1307            Connection::Udp(_s) => todo!(),
1308        }
1309    }
1310
1311    /// # Example
1312    ///
1313    /// ```rust
1314    /// # use mcmc_rs::Connection;
1315    /// # use smol::{io, block_on};
1316    /// #
1317    /// # block_on(async {
1318    /// # let mut conn = Connection::default().await?;
1319    /// let result = conn.delete(b"key", true).await?;
1320    /// assert!(result);
1321    /// # Ok::<(), io::Error>(())
1322    /// # }).unwrap()
1323    /// ```
1324    pub async fn delete(&mut self, key: impl AsRef<[u8]>, noreply: bool) -> io::Result<bool> {
1325        match self {
1326            Connection::Tcp(s) => delete_cmd(s, key.as_ref(), noreply).await,
1327            Connection::Unix(s) => delete_cmd(s, key.as_ref(), noreply).await,
1328            Connection::Udp(_s) => todo!(),
1329        }
1330    }
1331
1332    /// # Example
1333    ///
1334    /// ```rust
1335    /// # use mcmc_rs::Connection;
1336    /// # use smol::{io, block_on};
1337    /// # block_on(async {
1338    /// # let mut conn = Connection::default().await?;
1339    /// let result = conn.incr(b"key", 1, true).await?;
1340    /// assert!(result.is_none());
1341    /// # Ok::<(), io::Error>(())
1342    /// # }).unwrap()
1343    /// ```
1344    pub async fn incr(
1345        &mut self,
1346        key: impl AsRef<[u8]>,
1347        value: u64,
1348        noreply: bool,
1349    ) -> io::Result<Option<u64>> {
1350        match self {
1351            Connection::Tcp(s) => incr_decr_cmd(s, b"incr", key.as_ref(), value, noreply).await,
1352            Connection::Unix(s) => incr_decr_cmd(s, b"incr", key.as_ref(), value, noreply).await,
1353            Connection::Udp(_s) => todo!(),
1354        }
1355    }
1356
1357    /// # Example
1358    ///
1359    /// ```rust
1360    /// # use mcmc_rs::Connection;
1361    /// # use smol::{io, block_on};
1362    /// #
1363    /// # block_on(async {
1364    /// # let mut conn = Connection::default().await?;
1365    /// let result = conn.decr(b"key", 1, true).await?;
1366    /// assert!(result.is_none());
1367    /// # Ok::<(), io::Error>(())
1368    /// # }).unwrap()
1369    /// ```
1370    pub async fn decr(
1371        &mut self,
1372        key: impl AsRef<[u8]>,
1373        value: u64,
1374        noreply: bool,
1375    ) -> io::Result<Option<u64>> {
1376        match self {
1377            Connection::Tcp(s) => incr_decr_cmd(s, b"decr", key.as_ref(), value, noreply).await,
1378            Connection::Unix(s) => incr_decr_cmd(s, b"decr", key.as_ref(), value, noreply).await,
1379            Connection::Udp(_s) => todo!(),
1380        }
1381    }
1382
1383    /// # Example
1384    ///
1385    /// ```rust
1386    /// # use mcmc_rs::Connection;
1387    /// # use smol::{io, block_on};
1388    /// #
1389    /// # block_on(async {
1390    /// # let mut conn = Connection::default().await?;
1391    /// let result = conn.touch(b"key", -1, true).await?;
1392    /// assert!(result);
1393    /// # Ok::<(), io::Error>(())
1394    /// # }).unwrap()
1395    /// ```
1396    pub async fn touch(
1397        &mut self,
1398        key: impl AsRef<[u8]>,
1399        exptime: i64,
1400        noreply: bool,
1401    ) -> io::Result<bool> {
1402        match self {
1403            Connection::Tcp(s) => touch_cmd(s, key.as_ref(), exptime, noreply).await,
1404            Connection::Unix(s) => touch_cmd(s, key.as_ref(), exptime, noreply).await,
1405            Connection::Udp(_s) => todo!(),
1406        }
1407    }
1408
1409    /// # Example
1410    ///
1411    /// ```rust
1412    /// use mcmc_rs::Connection;
1413    /// # use smol::{io, block_on};
1414    /// #
1415    /// # block_on(async {
1416    /// # let mut conn = Connection::default().await?;
1417    /// # assert!(conn.set(b"k1", 0, 0, false, b"v1").await?);
1418    /// let result = conn.get(b"k1").await?;
1419    /// assert_eq!(result.unwrap().key, "k1");
1420    /// # Ok::<(), io::Error>(())
1421    /// # }).unwrap()
1422    /// ```
1423    pub async fn get(&mut self, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
1424        match self {
1425            Connection::Tcp(s) => Ok(retrieval_cmd(s, b"get", None, &[key.as_ref()]).await?.pop()),
1426            Connection::Unix(s) => Ok(retrieval_cmd(s, b"get", None, &[key.as_ref()]).await?.pop()),
1427            Connection::Udp(_s) => todo!(),
1428        }
1429    }
1430
1431    /// # Example
1432    ///
1433    /// ```rust
1434    /// # use mcmc_rs::{Connection, Item};
1435    /// # use smol::{io, block_on};
1436    /// #
1437    /// # block_on(async {
1438    /// # let mut conn = Connection::default().await?;
1439    /// assert!(conn.set(b"k2", 0, 0, false, b"v2").await?);
1440    /// let result = conn.gets(b"k2").await?;
1441    /// assert_eq!(result.unwrap().key, "k2");
1442    /// # Ok::<(), io::Error>(())
1443    /// # }).unwrap()
1444    /// ```
1445    pub async fn gets(&mut self, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
1446        match self {
1447            Connection::Tcp(s) => Ok(retrieval_cmd(s, b"gets", None, &[key.as_ref()])
1448                .await?
1449                .pop()),
1450            Connection::Unix(s) => Ok(retrieval_cmd(s, b"gets", None, &[key.as_ref()])
1451                .await?
1452                .pop()),
1453            Connection::Udp(_s) => todo!(),
1454        }
1455    }
1456
1457    /// # Example
1458    ///
1459    /// ```rust
1460    /// # use mcmc_rs::{Connection, Item};
1461    /// # use smol::{io, block_on};
1462    /// #
1463    /// # block_on(async {
1464    /// # let mut conn = Connection::default().await?;
1465    /// assert!(conn.set(b"k3", 0, 0, false, b"v3").await?);
1466    /// let result = conn.gat(0, b"k3").await?;
1467    /// assert_eq!(result.unwrap().key, "k3");
1468    /// # Ok::<(), io::Error>(())
1469    /// # }).unwrap()
1470    /// ```
1471    pub async fn gat(&mut self, exptime: i64, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
1472        match self {
1473            Connection::Tcp(s) => Ok(retrieval_cmd(s, b"gat", Some(exptime), &[key.as_ref()])
1474                .await?
1475                .pop()),
1476            Connection::Unix(s) => Ok(retrieval_cmd(s, b"gat", Some(exptime), &[key.as_ref()])
1477                .await?
1478                .pop()),
1479            Connection::Udp(_s) => todo!(),
1480        }
1481    }
1482
1483    /// # Example
1484    ///
1485    /// ```rust
1486    /// # use mcmc_rs::{Connection, Item};
1487    /// # use smol::{io, block_on};
1488    /// #
1489    /// # block_on(async {
1490    /// # let mut conn = Connection::default().await?;
1491    /// assert!(conn.set(b"k4", 0, 0, false, b"v4").await?);
1492    /// let result = conn.gats(0, b"k4").await?;
1493    /// assert_eq!(result.unwrap().key, "k4");
1494    /// # Ok::<(), io::Error>(())
1495    /// # }).unwrap()
1496    /// ```
1497    pub async fn gats(&mut self, exptime: i64, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
1498        match self {
1499            Connection::Tcp(s) => Ok(retrieval_cmd(s, b"gats", Some(exptime), &[key.as_ref()])
1500                .await?
1501                .pop()),
1502            Connection::Unix(s) => Ok(retrieval_cmd(s, b"gats", Some(exptime), &[key.as_ref()])
1503                .await?
1504                .pop()),
1505            Connection::Udp(_s) => todo!(),
1506        }
1507    }
1508
1509    /// # Example
1510    ///
1511    /// ```rust
1512    /// # use mcmc_rs::Connection;
1513    /// # use smol::{io, block_on};
1514    /// #
1515    /// # block_on(async {
1516    /// # let mut conn = Connection::default().await?;
1517    /// assert!(conn.set(b"k8", 0, 0, false, b"v8").await?);
1518    /// let result = conn.get_multi(&[b"k8"]).await?;
1519    /// assert_eq!(result[0].key, "k8");
1520    /// # Ok::<(), io::Error>(())
1521    /// # }).unwrap()
1522    /// ```
1523    pub async fn get_multi(&mut self, keys: &[impl AsRef<[u8]>]) -> io::Result<Vec<Item>> {
1524        match self {
1525            Connection::Tcp(s) => {
1526                retrieval_cmd(
1527                    s,
1528                    b"get",
1529                    None,
1530                    &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1531                )
1532                .await
1533            }
1534            Connection::Unix(s) => {
1535                retrieval_cmd(
1536                    s,
1537                    b"get",
1538                    None,
1539                    &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1540                )
1541                .await
1542            }
1543            Connection::Udp(_s) => todo!(),
1544        }
1545    }
1546
1547    /// # Example
1548    ///
1549    /// ```rust
1550    /// # use mcmc_rs::Connection;
1551    /// # use smol::{io, block_on};
1552    /// #
1553    /// # block_on(async {
1554    /// # let mut conn = Connection::default().await?;
1555    /// assert!(conn.set(b"k7", 0, 0, false, b"v7").await?);
1556    /// let result = conn.gets_multi(&[b"k7"]).await?;
1557    /// assert_eq!(result[0].key, "k7");
1558    /// # Ok::<(), io::Error>(())
1559    /// # }).unwrap()
1560    /// ```
1561    pub async fn gets_multi(&mut self, keys: &[impl AsRef<[u8]>]) -> io::Result<Vec<Item>> {
1562        match self {
1563            Connection::Tcp(s) => {
1564                retrieval_cmd(
1565                    s,
1566                    b"gets",
1567                    None,
1568                    &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1569                )
1570                .await
1571            }
1572            Connection::Unix(s) => {
1573                retrieval_cmd(
1574                    s,
1575                    b"gets",
1576                    None,
1577                    &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1578                )
1579                .await
1580            }
1581            Connection::Udp(_s) => todo!(),
1582        }
1583    }
1584
1585    /// # Example
1586    ///
1587    /// ```rust
1588    /// # use mcmc_rs::Connection;
1589    /// # use smol::{io, block_on};
1590    /// #
1591    /// # block_on(async {
1592    /// # let mut conn = Connection::default().await?;
1593    /// assert!(conn.set(b"k6", 0, 0, false, b"v6").await?);
1594    /// let result = conn.gat_multi(0, &[b"k6"]).await?;
1595    /// assert_eq!(result[0].key, "k6");
1596    /// # Ok::<(), io::Error>(())
1597    /// # }).unwrap()
1598    /// ```
1599    pub async fn gat_multi(
1600        &mut self,
1601        exptime: i64,
1602        keys: &[impl AsRef<[u8]>],
1603    ) -> io::Result<Vec<Item>> {
1604        match self {
1605            Connection::Tcp(s) => {
1606                retrieval_cmd(
1607                    s,
1608                    b"gat",
1609                    Some(exptime),
1610                    &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1611                )
1612                .await
1613            }
1614            Connection::Unix(s) => {
1615                retrieval_cmd(
1616                    s,
1617                    b"gat",
1618                    Some(exptime),
1619                    &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1620                )
1621                .await
1622            }
1623            Connection::Udp(_s) => todo!(),
1624        }
1625    }
1626
1627    /// # Example
1628    ///
1629    /// ```rust
1630    /// # use mcmc_rs::Connection;
1631    /// # use smol::{io, block_on};
1632    /// #
1633    /// # block_on(async {
1634    /// let mut conn = Connection::default().await?;
1635    /// assert!(conn.set(b"k5", 0, 0, false, b"v5").await?);
1636    /// let result = conn.gats_multi(0, &[b"k5"]).await?;
1637    /// assert_eq!(result[0].key, "k5");
1638    /// #     Ok::<(), io::Error>(())
1639    /// # }).unwrap()
1640    /// ```
1641    pub async fn gats_multi(
1642        &mut self,
1643        exptime: i64,
1644        keys: &[impl AsRef<[u8]>],
1645    ) -> io::Result<Vec<Item>> {
1646        match self {
1647            Connection::Tcp(s) => {
1648                retrieval_cmd(
1649                    s,
1650                    b"gats",
1651                    Some(exptime),
1652                    &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1653                )
1654                .await
1655            }
1656            Connection::Unix(s) => {
1657                retrieval_cmd(
1658                    s,
1659                    b"gats",
1660                    Some(exptime),
1661                    &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1662                )
1663                .await
1664            }
1665            Connection::Udp(_s) => todo!(),
1666        }
1667    }
1668
1669    /// # Example
1670    ///
1671    /// ```rust
1672    /// use mcmc_rs::Connection;
1673    /// use mcmc_rs::StatsArg;
1674    /// # use smol::{io, block_on};
1675    /// #
1676    /// # block_on(async {
1677    /// let mut conn = Connection::default().await?;
1678    /// let result = conn.stats(StatsArg::Empty).await?;
1679    /// assert!(result.len() > 0);
1680    /// # Ok::<(), io::Error>(())
1681    /// # }).unwrap()
1682    /// ```
1683    pub async fn stats(&mut self, arg: StatsArg) -> io::Result<HashMap<String, String>> {
1684        match self {
1685            Connection::Tcp(s) => stats_cmd(s, arg).await,
1686            Connection::Unix(s) => stats_cmd(s, arg).await,
1687            Connection::Udp(_s) => todo!(),
1688        }
1689    }
1690
1691    /// # Example
1692    ///
1693    /// ```rust
1694    /// use mcmc_rs::Connection;
1695    /// use mcmc_rs::SlabsAutomoveArg;
1696    /// # use smol::{io, block_on};
1697    /// #
1698    /// # block_on(async {
1699    /// let mut conn = Connection::default().await?;
1700    /// conn.slabs_automove(SlabsAutomoveArg::Zero).await?;
1701    /// # Ok::<(), io::Error>(())
1702    /// # }).unwrap()
1703    /// ```
1704    pub async fn slabs_automove(&mut self, arg: SlabsAutomoveArg) -> io::Result<()> {
1705        match self {
1706            Connection::Tcp(s) => slabs_automove_cmd(s, arg).await,
1707            Connection::Unix(s) => slabs_automove_cmd(s, arg).await,
1708            Connection::Udp(_s) => todo!(),
1709        }
1710    }
1711
1712    /// # Example
1713    ///
1714    /// ```rust
1715    /// use mcmc_rs::{Connection, LruCrawlerArg};
1716    /// # use smol::{io, block_on};
1717    /// #
1718    /// # block_on(async {
1719    /// let mut conn = Connection::default().await?;
1720    /// let result = conn.lru_crawler(LruCrawlerArg::Enable).await;
1721    /// assert!(result.is_err());
1722    /// # Ok::<(), io::Error>(())
1723    /// # }).unwrap()
1724    /// ```
1725    pub async fn lru_crawler(&mut self, arg: LruCrawlerArg) -> io::Result<()> {
1726        match self {
1727            Connection::Tcp(s) => lru_crawler_cmd(s, arg).await,
1728            Connection::Unix(s) => lru_crawler_cmd(s, arg).await,
1729            Connection::Udp(_s) => todo!(),
1730        }
1731    }
1732
1733    /// # Example
1734    ///
1735    /// ```rust
1736    /// use mcmc_rs::Connection;
1737    /// # use smol::{io, block_on};
1738    /// #
1739    /// # block_on(async {
1740    /// let mut conn = Connection::default().await?;
1741    /// conn.lru_crawler_sleep(1_000_000).await?;
1742    /// # Ok::<(), io::Error>(())
1743    /// # }).unwrap()
1744    /// ```
1745    pub async fn lru_crawler_sleep(&mut self, microseconds: usize) -> io::Result<()> {
1746        match self {
1747            Connection::Tcp(s) => lru_crawler_sleep_cmd(s, microseconds).await,
1748            Connection::Unix(s) => lru_crawler_sleep_cmd(s, microseconds).await,
1749            Connection::Udp(_s) => todo!(),
1750        }
1751    }
1752
1753    /// # Example
1754    ///
1755    /// ```rust
1756    /// use mcmc_rs::Connection;
1757    /// # use smol::{io, block_on};
1758    /// #
1759    /// # block_on(async {
1760    /// let mut conn = Connection::default().await?;
1761    /// conn.lru_crawler_tocrawl(0).await?;
1762    /// # Ok::<(), io::Error>(())
1763    /// # }).unwrap()
1764    /// ```
1765    pub async fn lru_crawler_tocrawl(&mut self, arg: u32) -> io::Result<()> {
1766        match self {
1767            Connection::Tcp(s) => lru_crawler_tocrawl_cmd(s, arg).await,
1768            Connection::Unix(s) => lru_crawler_tocrawl_cmd(s, arg).await,
1769            Connection::Udp(_s) => todo!(),
1770        }
1771    }
1772
1773    /// # Example
1774    ///
1775    /// ```rust
1776    /// use mcmc_rs::{Connection, LruCrawlerCrawlArg};
1777    /// # use smol::{io, block_on};
1778    /// #
1779    /// # block_on(async {
1780    /// let mut conn = Connection::default().await?;
1781    /// conn.lru_crawler_crawl(LruCrawlerCrawlArg::All).await?;
1782    /// # Ok::<(), io::Error>(())
1783    /// # }).unwrap()
1784    /// ```
1785    pub async fn lru_crawler_crawl(&mut self, arg: LruCrawlerCrawlArg<'_>) -> io::Result<()> {
1786        match self {
1787            Connection::Tcp(s) => lru_crawler_crawl_cmd(s, arg).await,
1788            Connection::Unix(s) => lru_crawler_crawl_cmd(s, arg).await,
1789            Connection::Udp(_s) => todo!(),
1790        }
1791    }
1792
1793    /// # Example
1794    ///
1795    /// ```rust
1796    /// use mcmc_rs::Connection;
1797    /// # use smol::{io, block_on};
1798    /// #
1799    /// # block_on(async {
1800    /// let mut conn = Connection::default().await?;
1801    /// let result = conn.slabs_reassign(1, 2).await;
1802    /// assert!(result.is_err());
1803    /// # Ok::<(), io::Error>(())
1804    /// # }).unwrap()
1805    /// ```
1806    pub async fn slabs_reassign(
1807        &mut self,
1808        source_class: usize,
1809        dest_class: usize,
1810    ) -> io::Result<()> {
1811        match self {
1812            Connection::Tcp(s) => slabs_reassign_cmd(s, source_class, dest_class).await,
1813            Connection::Unix(s) => slabs_reassign_cmd(s, source_class, dest_class).await,
1814            Connection::Udp(_s) => todo!(),
1815        }
1816    }
1817
1818    /// # Example
1819    ///
1820    /// ```rust
1821    /// use mcmc_rs::{Connection, LruCrawlerMetadumpArg};
1822    /// # use smol::{io, block_on};
1823    /// #
1824    /// # block_on(async {
1825    /// let mut conn = Connection::default().await?;
1826    /// let result = conn.lru_crawler_metadump(LruCrawlerMetadumpArg::Classids(&[2])).await?;
1827    /// assert!(result.is_empty());
1828    /// # Ok::<(), io::Error>(())
1829    /// # }).unwrap()
1830    /// ```
1831    pub async fn lru_crawler_metadump(
1832        &mut self,
1833        arg: LruCrawlerMetadumpArg<'_>,
1834    ) -> io::Result<Vec<String>> {
1835        match self {
1836            Connection::Tcp(s) => lru_crawler_metadump_cmd(s, arg).await,
1837            Connection::Unix(s) => lru_crawler_metadump_cmd(s, arg).await,
1838            Connection::Udp(_s) => todo!(),
1839        }
1840    }
1841
1842    /// # Example
1843    ///
1844    /// ```rust
1845    /// use mcmc_rs::{Connection, LruCrawlerMgdumpArg};
1846    /// # use smol::{io, block_on};
1847    /// #
1848    /// # block_on(async {
1849    /// let mut conn = Connection::unix_connect("/tmp/memcached.sock").await?;
1850    /// let result = conn.lru_crawler_mgdump(LruCrawlerMgdumpArg::Classids(&[3])).await?;
1851    /// assert!(result.is_empty());
1852    /// # Ok::<(), io::Error>(())
1853    /// # }).unwrap()
1854    /// ```
1855    pub async fn lru_crawler_mgdump(
1856        &mut self,
1857        arg: LruCrawlerMgdumpArg<'_>,
1858    ) -> io::Result<Vec<String>> {
1859        match self {
1860            Connection::Tcp(s) => lru_crawler_mgdump_cmd(s, arg).await,
1861            Connection::Unix(s) => lru_crawler_mgdump_cmd(s, arg).await,
1862            Connection::Udp(_s) => todo!(),
1863        }
1864    }
1865
1866    /// # Example
1867    ///
1868    /// ```rust
1869    /// use mcmc_rs::Connection;
1870    /// # use smol::{io, block_on};
1871    /// #
1872    /// # block_on(async {
1873    /// let mut conn = Connection::default().await?;
1874    /// assert!(conn.mn().await.is_ok());
1875    /// # Ok::<(), io::Error>(())
1876    /// # }).unwrap()
1877    /// ```
1878    pub async fn mn(&mut self) -> io::Result<()> {
1879        match self {
1880            Connection::Tcp(s) => mn_cmd(s).await,
1881            Connection::Unix(s) => mn_cmd(s).await,
1882            Connection::Udp(_s) => todo!(),
1883        }
1884    }
1885
1886    /// # Example
1887    ///
1888    /// ```rust
1889    /// use mcmc_rs::Connection;
1890    /// # use smol::{io, block_on};
1891    /// #
1892    /// # block_on(async {
1893    /// let mut conn = Connection::default().await?;
1894    /// assert!(conn.set(b"k6", 0, 0, false, b"v6").await?);
1895    /// assert!(conn.me(b"k6").await?.is_some());
1896    /// # Ok::<(), io::Error>(())
1897    /// # }).unwrap()
1898    /// ```
1899    pub async fn me(&mut self, key: impl AsRef<[u8]>) -> io::Result<Option<String>> {
1900        match self {
1901            Connection::Tcp(s) => me_cmd(s, key.as_ref()).await,
1902            Connection::Unix(s) => me_cmd(s, key.as_ref()).await,
1903            Connection::Udp(_s) => todo!(),
1904        }
1905    }
1906
1907    pub fn pipeline(&mut self) -> Pipeline {
1908        Pipeline::new(self)
1909    }
1910}
1911
1912pub struct ClientCrc32(Vec<Connection>);
1913impl ClientCrc32 {
1914    /// # Example
1915    ///
1916    /// ```rust
1917    /// use mcmc_rs::{Connection, ClientCrc32};
1918    /// # use smol::{io, block_on};
1919    /// #
1920    /// # block_on(async {
1921    /// let mut client = ClientCrc32::new(
1922    ///     vec![
1923    ///     Connection::default().await?,
1924    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
1925    ///     ]
1926    /// );
1927    /// # Ok::<(), io::Error>(())
1928    /// # }).unwrap()
1929    ///```
1930    pub fn new(conns: Vec<Connection>) -> Self {
1931        Self(conns)
1932    }
1933
1934    /// # Example
1935    ///
1936    /// ```rust
1937    /// use mcmc_rs::{Connection, ClientCrc32};
1938    /// # use smol::{io, block_on};
1939    /// #
1940    /// # block_on(async {
1941    /// let mut client = ClientCrc32::new(
1942    ///     vec![
1943    ///     Connection::default().await?,
1944    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
1945    ///     ]
1946    /// );
1947    ///
1948    /// assert!(client.set(b"k7", 0, 0, false, b"v7").await?);
1949    /// assert_eq!(client.get(b"k7").await?.unwrap().key, "k7");
1950    /// # Ok::<(), io::Error>(())
1951    /// # }).unwrap()
1952    /// ```
1953    pub async fn get(&mut self, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
1954        let size = self.0.len();
1955        self.0[crc32fast::hash(key.as_ref()) as usize % size]
1956            .get(key.as_ref())
1957            .await
1958    }
1959
1960    /// # Example
1961    ///
1962    /// ```rust
1963    /// use mcmc_rs::{Connection, ClientCrc32};
1964    /// # use smol::{io, block_on};
1965    /// #
1966    /// # block_on(async {
1967    /// let mut client = ClientCrc32::new(
1968    ///     vec![
1969    ///     Connection::default().await?,
1970    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
1971    ///     ]
1972    /// );
1973    ///
1974    /// assert!(client.set(b"k8", 0, 0, false, b"v8").await?);
1975    /// assert_eq!(client.gets(b"k8").await?.unwrap().key, "k8");
1976    /// # Ok::<(), io::Error>(())
1977    /// # }).unwrap()
1978    /// ```
1979    pub async fn gets(&mut self, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
1980        let size = self.0.len();
1981        self.0[crc32fast::hash(key.as_ref()) as usize % size]
1982            .gets(key.as_ref())
1983            .await
1984    }
1985
1986    /// # Example
1987    ///
1988    /// ```rust
1989    /// # use mcmc_rs::{Connection, Item};
1990    /// # use smol::{io, block_on};
1991    /// #
1992    /// # block_on(async {
1993    /// # let mut conn = Connection::default().await?;
1994    /// assert!(conn.set(b"k9", 0, 0, false, b"v9").await?);
1995    /// let result = conn.gat(0, b"k9").await?;
1996    /// assert_eq!(result.unwrap().key, "k9");
1997    /// # Ok::<(), io::Error>(())
1998    /// # }).unwrap()
1999    /// ```
2000    pub async fn gat(&mut self, exptime: i64, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
2001        let size = self.0.len();
2002        self.0[crc32fast::hash(key.as_ref()) as usize % size]
2003            .gat(exptime, key.as_ref())
2004            .await
2005    }
2006
2007    /// # Example
2008    ///
2009    /// ```rust
2010    /// # use mcmc_rs::{Connection, Item};
2011    /// # use smol::{io, block_on};
2012    /// #
2013    /// # block_on(async {
2014    /// # let mut conn = Connection::default().await?;
2015    /// assert!(conn.set(b"k10", 0, 0, false, b"v10").await?);
2016    /// let result = conn.gats(0, b"k10").await?;
2017    /// assert_eq!(result.unwrap().key, "k10");
2018    /// # Ok::<(), io::Error>(())
2019    /// # }).unwrap()
2020    /// ```
2021    pub async fn gats(&mut self, exptime: i64, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
2022        let size = self.0.len();
2023        self.0[crc32fast::hash(key.as_ref()) as usize % size]
2024            .gats(exptime, key.as_ref())
2025            .await
2026    }
2027
2028    /// # Example
2029    ///
2030    /// ```rust
2031    /// use mcmc_rs::{Connection, ClientCrc32};
2032    /// # use smol::{io, block_on};
2033    /// #
2034    /// # block_on(async {
2035    /// let mut client = ClientCrc32::new(
2036    ///     vec![
2037    ///     Connection::default().await?,
2038    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
2039    ///     ]
2040    /// );
2041    ///
2042    /// assert!(client.set(b"key", 0, -1, true, b"value").await?);
2043    /// # Ok::<(), io::Error>(())
2044    /// # }).unwrap()
2045    /// ```
2046    pub async fn set(
2047        &mut self,
2048        key: impl AsRef<[u8]>,
2049        flags: u32,
2050        exptime: i64,
2051        noreply: bool,
2052        data_block: impl AsRef<[u8]>,
2053    ) -> io::Result<bool> {
2054        let size = self.0.len();
2055        self.0[crc32fast::hash(key.as_ref()) as usize % size]
2056            .set(key.as_ref(), flags, exptime, noreply, data_block.as_ref())
2057            .await
2058    }
2059
2060    /// # Example
2061    ///
2062    /// ```rust
2063    /// use mcmc_rs::{Connection, ClientCrc32};
2064    /// # use smol::{io, block_on};
2065    /// #
2066    /// # block_on(async {
2067    /// let mut client = ClientCrc32::new(
2068    ///     vec![
2069    ///     Connection::default().await?,
2070    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
2071    ///     ]
2072    /// );
2073    ///
2074    /// assert!(client.add(b"key", 0, -1, true, b"value").await?);
2075    /// # Ok::<(), io::Error>(())
2076    /// # }).unwrap()
2077    /// ```
2078    pub async fn add(
2079        &mut self,
2080        key: impl AsRef<[u8]>,
2081        flags: u32,
2082        exptime: i64,
2083        noreply: bool,
2084        data_block: impl AsRef<[u8]>,
2085    ) -> io::Result<bool> {
2086        let size = self.0.len();
2087        self.0[crc32fast::hash(key.as_ref()) as usize % size]
2088            .add(key.as_ref(), flags, exptime, noreply, data_block.as_ref())
2089            .await
2090    }
2091
2092    /// # Example
2093    ///
2094    /// ```rust
2095    /// use mcmc_rs::{Connection, ClientCrc32};
2096    /// # use smol::{io, block_on};
2097    /// #
2098    /// # block_on(async {
2099    /// let mut client = ClientCrc32::new(
2100    ///     vec![
2101    ///     Connection::default().await?,
2102    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
2103    ///     ]
2104    /// );
2105    ///
2106    /// assert!(client.replace(b"key", 0, -1, true, b"value").await?);
2107    /// # Ok::<(), io::Error>(())
2108    /// # }).unwrap()
2109    /// ```
2110    pub async fn replace(
2111        &mut self,
2112        key: impl AsRef<[u8]>,
2113        flags: u32,
2114        exptime: i64,
2115        noreply: bool,
2116        data_block: impl AsRef<[u8]>,
2117    ) -> io::Result<bool> {
2118        let size = self.0.len();
2119        self.0[crc32fast::hash(key.as_ref()) as usize % size]
2120            .replace(key.as_ref(), flags, exptime, noreply, data_block.as_ref())
2121            .await
2122    }
2123
2124    /// # Example
2125    ///
2126    /// ```rust
2127    /// use mcmc_rs::{Connection, ClientCrc32};
2128    /// # use smol::{io, block_on};
2129    /// #
2130    /// # block_on(async {
2131    /// let mut client = ClientCrc32::new(
2132    ///     vec![
2133    ///     Connection::default().await?,
2134    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
2135    ///     ]
2136    /// );
2137    ///
2138    /// assert!(client.append(b"key", 0, -1, true, b"value").await?);
2139    /// # Ok::<(), io::Error>(())
2140    /// # }).unwrap()
2141    /// ```
2142    pub async fn append(
2143        &mut self,
2144        key: impl AsRef<[u8]>,
2145        flags: u32,
2146        exptime: i64,
2147        noreply: bool,
2148        data_block: impl AsRef<[u8]>,
2149    ) -> io::Result<bool> {
2150        let size = self.0.len();
2151        self.0[crc32fast::hash(key.as_ref()) as usize % size]
2152            .append(key.as_ref(), flags, exptime, noreply, data_block.as_ref())
2153            .await
2154    }
2155
2156    /// # Example
2157    ///
2158    /// ```rust
2159    /// use mcmc_rs::{Connection, ClientCrc32};
2160    /// # use smol::{io, block_on};
2161    /// #
2162    /// # block_on(async {
2163    /// let mut client = ClientCrc32::new(
2164    ///     vec![
2165    ///     Connection::default().await?,
2166    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
2167    ///     ]
2168    /// );
2169    ///
2170    /// assert!(client.prepend(b"key", 0, -1, true, b"value").await?);
2171    /// # Ok::<(), io::Error>(())
2172    /// # }).unwrap()
2173    /// ```
2174    pub async fn prepend(
2175        &mut self,
2176        key: impl AsRef<[u8]>,
2177        flags: u32,
2178        exptime: i64,
2179        noreply: bool,
2180        data_block: impl AsRef<[u8]>,
2181    ) -> io::Result<bool> {
2182        let size = self.0.len();
2183        self.0[crc32fast::hash(key.as_ref()) as usize % size]
2184            .prepend(key.as_ref(), flags, exptime, noreply, data_block.as_ref())
2185            .await
2186    }
2187
2188    /// # Example
2189    ///
2190    /// ```rust
2191    /// use mcmc_rs::{Connection, ClientCrc32};
2192    /// # use smol::{io, block_on};
2193    /// #
2194    /// # block_on(async {
2195    /// let mut client = ClientCrc32::new(
2196    ///     vec![
2197    ///     Connection::default().await?,
2198    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
2199    ///     ]
2200    /// );
2201    ///
2202    /// assert!(client.cas(b"key", 0, -1, 0, true, b"value").await?);
2203    /// # Ok::<(), io::Error>(())
2204    /// # }).unwrap()
2205    /// ```
2206    pub async fn cas(
2207        &mut self,
2208        key: impl AsRef<[u8]>,
2209        flags: u32,
2210        exptime: i64,
2211        cas_unique: u64,
2212        noreply: bool,
2213        data_block: impl AsRef<[u8]>,
2214    ) -> io::Result<bool> {
2215        let size = self.0.len();
2216        self.0[crc32fast::hash(key.as_ref()) as usize % size]
2217            .cas(
2218                key.as_ref(),
2219                flags,
2220                exptime,
2221                cas_unique,
2222                noreply,
2223                data_block.as_ref(),
2224            )
2225            .await
2226    }
2227
2228    /// # Example
2229    ///
2230    /// ```rust
2231    /// use mcmc_rs::{Connection, ClientCrc32};
2232    /// # use smol::{io, block_on};
2233    /// #
2234    /// # block_on(async {
2235    /// let mut client = ClientCrc32::new(
2236    ///     vec![
2237    ///     Connection::default().await?,
2238    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
2239    ///     ]
2240    /// );
2241    ///
2242    /// assert!(client.delete(b"key", true).await?);
2243    /// # Ok::<(), io::Error>(())
2244    /// # }).unwrap()
2245    /// ```
2246    pub async fn delete(&mut self, key: impl AsRef<[u8]>, noreply: bool) -> io::Result<bool> {
2247        let size = self.0.len();
2248        self.0[crc32fast::hash(key.as_ref()) as usize % size]
2249            .delete(key.as_ref(), noreply)
2250            .await
2251    }
2252
2253    /// # Example
2254    ///
2255    /// ```rust
2256    /// use mcmc_rs::{Connection, ClientCrc32};
2257    /// # use smol::{io, block_on};
2258    /// #
2259    /// # block_on(async {
2260    /// let mut client = ClientCrc32::new(
2261    ///     vec![
2262    ///     Connection::default().await?,
2263    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
2264    ///     ]
2265    /// );
2266    ///
2267    /// assert!(client.incr(b"key", 1, true).await?.is_none());
2268    /// # Ok::<(), io::Error>(())
2269    /// # }).unwrap()
2270    /// ```
2271    pub async fn incr(
2272        &mut self,
2273        key: impl AsRef<[u8]>,
2274        value: u64,
2275        noreply: bool,
2276    ) -> io::Result<Option<u64>> {
2277        let size = self.0.len();
2278        self.0[crc32fast::hash(key.as_ref()) as usize % size]
2279            .incr(key.as_ref(), value, noreply)
2280            .await
2281    }
2282
2283    /// # Example
2284    ///
2285    /// ```rust
2286    /// use mcmc_rs::{Connection, ClientCrc32};
2287    /// # use smol::{io, block_on};
2288    /// #
2289    /// # block_on(async {
2290    /// let mut client = ClientCrc32::new(
2291    ///     vec![
2292    ///     Connection::default().await?,
2293    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
2294    ///     ]
2295    /// );
2296    ///
2297    /// assert!(client.decr(b"key", 1, true).await?.is_none());
2298    /// # Ok::<(), io::Error>(())
2299    /// # }).unwrap()
2300    /// ```
2301    pub async fn decr(
2302        &mut self,
2303        key: impl AsRef<[u8]>,
2304        value: u64,
2305        noreply: bool,
2306    ) -> io::Result<Option<u64>> {
2307        let size = self.0.len();
2308        self.0[crc32fast::hash(key.as_ref()) as usize % size]
2309            .decr(key.as_ref(), value, noreply)
2310            .await
2311    }
2312
2313    /// # Example
2314    ///
2315    /// ```rust
2316    /// use mcmc_rs::{Connection, ClientCrc32};
2317    /// # use smol::{io, block_on};
2318    /// #
2319    /// # block_on(async {
2320    /// let mut client = ClientCrc32::new(
2321    ///     vec![
2322    ///     Connection::default().await?,
2323    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
2324    ///     ]
2325    /// );
2326    ///
2327    /// assert!(client.touch(b"key", -1, true).await?);
2328    /// # Ok::<(), io::Error>(())
2329    /// # }).unwrap()
2330    /// ```
2331    pub async fn touch(
2332        &mut self,
2333        key: impl AsRef<[u8]>,
2334        exptime: i64,
2335        noreply: bool,
2336    ) -> io::Result<bool> {
2337        let size = self.0.len();
2338        self.0[crc32fast::hash(key.as_ref()) as usize % size]
2339            .touch(key.as_ref(), exptime, noreply)
2340            .await
2341    }
2342
2343    /// # Example
2344    ///
2345    /// ```rust
2346    /// use mcmc_rs::{Connection, ClientCrc32};
2347    /// # use smol::{io, block_on};
2348    /// #
2349    /// # block_on(async {
2350    /// let mut client = ClientCrc32::new(
2351    ///     vec![
2352    ///     Connection::default().await?,
2353    ///     Connection::unix_connect("/tmp/memcached.sock").await?,
2354    ///     ]
2355    /// );
2356    /// assert!(client.set(b"k11", 0, 0, false, b"v11").await?);
2357    /// assert!(client.me(b"k11").await?.is_some());
2358    /// # Ok::<(), io::Error>(())
2359    /// # }).unwrap()
2360    /// ```
2361    pub async fn me(&mut self, key: impl AsRef<[u8]>) -> io::Result<Option<String>> {
2362        let size = self.0.len();
2363        self.0[crc32fast::hash(key.as_ref()) as usize % size]
2364            .me(key.as_ref())
2365            .await
2366    }
2367}
2368
2369pub struct Pipeline<'a>(&'a mut Connection, Vec<Vec<u8>>);
2370impl<'a> Pipeline<'a> {
2371    /// # Example
2372    ///
2373    /// ```rust
2374    /// use mcmc_rs::Connection;
2375    /// # use smol::{io, block_on};
2376    /// #
2377    /// # block_on(async {
2378    /// let mut conn = Connection::default().await?;
2379    /// conn.pipeline();
2380    /// # Ok::<(), io::Error>(())
2381    /// # }).unwrap()
2382    /// ```
2383    fn new(conn: &'a mut Connection) -> Self {
2384        Self(conn, Vec::new())
2385    }
2386
2387    /// # Example
2388    ///
2389    /// ```rust
2390    /// use mcmc_rs::Connection;
2391    /// # use smol::{io, block_on};
2392    /// #
2393    /// # block_on(async {
2394    /// let mut conn = Connection::default().await?;
2395    /// assert_eq!(conn.pipeline().execute().await?, []);
2396    /// # Ok::<(), io::Error>(())
2397    /// # }).unwrap()
2398    /// ```
2399    pub async fn execute(self) -> io::Result<Vec<PipelineResponse>> {
2400        if self.1.is_empty() {
2401            return Ok(Vec::new());
2402        };
2403        match self.0 {
2404            Connection::Tcp(s) => execute_cmd(s, &self.1).await,
2405            Connection::Unix(s) => execute_cmd(s, &self.1).await,
2406            Connection::Udp(_s) => todo!(),
2407        }
2408    }
2409
2410    /// # Example
2411    ///
2412    /// ```rust
2413    /// use mcmc_rs::Connection;
2414    /// # use smol::{io, block_on};
2415    /// #
2416    /// # block_on(async {
2417    /// let mut conn = Connection::default().await?;
2418    /// conn.pipeline().version();
2419    /// # Ok::<(), io::Error>(())
2420    /// # }).unwrap()
2421    /// ```
2422    pub fn version(mut self) -> Self {
2423        self.1.push(build_version_cmd().to_vec());
2424        self
2425    }
2426
2427    /// # Example
2428    ///
2429    /// ```rust
2430    /// use mcmc_rs::Connection;
2431    /// # use smol::{io, block_on};
2432    /// #
2433    /// # block_on(async {
2434    /// let mut conn = Connection::default().await?;
2435    /// conn.pipeline().quit();
2436    /// # Ok::<(), io::Error>(())
2437    /// # }).unwrap()
2438    /// ```
2439    pub fn quit(mut self) -> Self {
2440        self.1.push(build_quit_cmd().to_vec());
2441        self
2442    }
2443
2444    /// # Example
2445    ///
2446    /// ```rust
2447    /// use mcmc_rs::Connection;
2448    /// # use smol::{io, block_on};
2449    /// #
2450    /// # block_on(async {
2451    /// let mut conn = Connection::default().await?;
2452    /// conn.pipeline().shutdown(false);
2453    /// # Ok::<(), io::Error>(())
2454    /// # }).unwrap()
2455    /// ```
2456    pub fn shutdown(mut self, graceful: bool) -> Self {
2457        self.1.push(build_shutdown_cmd(graceful).to_vec());
2458        self
2459    }
2460
2461    /// # Example
2462    ///
2463    /// ```rust
2464    /// use mcmc_rs::Connection;
2465    /// # use smol::{io, block_on};
2466    /// #
2467    /// # block_on(async {
2468    /// let mut conn = Connection::default().await?;
2469    /// conn.pipeline().cache_memlimit(1, false);
2470    /// # Ok::<(), io::Error>(())
2471    /// # }).unwrap()
2472    /// ```
2473    pub fn cache_memlimit(mut self, limit: usize, noreply: bool) -> Self {
2474        self.1
2475            .push(build_cache_memlimit_cmd(limit, noreply).to_vec());
2476        self
2477    }
2478
2479    /// # Example
2480    ///
2481    /// ```rust
2482    /// use mcmc_rs::Connection;
2483    /// # use smol::{io, block_on};
2484    /// #
2485    /// # block_on(async {
2486    /// let mut conn = Connection::default().await?;
2487    /// conn.pipeline().flush_all(None, false);
2488    /// # Ok::<(), io::Error>(())
2489    /// # }).unwrap()
2490    /// ```
2491    pub fn flush_all(mut self, exptime: Option<i64>, noreply: bool) -> Self {
2492        self.1.push(build_flush_all_cmd(exptime, noreply).to_vec());
2493        self
2494    }
2495
2496    /// # Example
2497    ///
2498    /// ```rust
2499    /// use mcmc_rs::Connection;
2500    /// # use smol::{io, block_on};
2501    /// #
2502    /// # block_on(async {
2503    /// let mut conn = Connection::default().await?;
2504    /// conn.pipeline().set(b"key", 0, 0, false, b"value");
2505    /// # Ok::<(), io::Error>(())
2506    /// # }).unwrap()
2507    /// ```
2508    pub fn set(
2509        mut self,
2510        key: impl AsRef<[u8]>,
2511        flags: u32,
2512        exptime: i64,
2513        noreply: bool,
2514        data_block: impl AsRef<[u8]>,
2515    ) -> Self {
2516        self.1.push(build_storage_cmd(
2517            b"set",
2518            key.as_ref(),
2519            flags,
2520            exptime,
2521            None,
2522            noreply,
2523            data_block.as_ref(),
2524        ));
2525        self
2526    }
2527
2528    /// # Example
2529    ///
2530    /// ```rust
2531    /// use mcmc_rs::Connection;
2532    /// # use smol::{io, block_on};
2533    /// #
2534    /// # block_on(async {
2535    /// let mut conn = Connection::default().await?;
2536    /// conn.pipeline().add(b"key", 0, 0, false, b"value");
2537    /// # Ok::<(), io::Error>(())
2538    /// # }).unwrap()
2539    /// ```
2540    pub fn add(
2541        mut self,
2542        key: impl AsRef<[u8]>,
2543        flags: u32,
2544        exptime: i64,
2545        noreply: bool,
2546        data_block: impl AsRef<[u8]>,
2547    ) -> Self {
2548        self.1.push(build_storage_cmd(
2549            b"add",
2550            key.as_ref(),
2551            flags,
2552            exptime,
2553            None,
2554            noreply,
2555            data_block.as_ref(),
2556        ));
2557        self
2558    }
2559
2560    /// # Example
2561    ///
2562    /// ```rust
2563    /// use mcmc_rs::Connection;
2564    /// # use smol::{io, block_on};
2565    /// #
2566    /// # block_on(async {
2567    /// let mut conn = Connection::default().await?;
2568    /// conn.pipeline().replace(b"key", 0, 0, false, b"value");
2569    /// # Ok::<(), io::Error>(())
2570    /// # }).unwrap()
2571    /// ```
2572    pub fn replace(
2573        mut self,
2574        key: impl AsRef<[u8]>,
2575        flags: u32,
2576        exptime: i64,
2577        noreply: bool,
2578        data_block: impl AsRef<[u8]>,
2579    ) -> Self {
2580        self.1.push(build_storage_cmd(
2581            b"replace",
2582            key.as_ref(),
2583            flags,
2584            exptime,
2585            None,
2586            noreply,
2587            data_block.as_ref(),
2588        ));
2589        self
2590    }
2591
2592    /// # Example
2593    ///
2594    /// ```rust
2595    /// use mcmc_rs::Connection;
2596    /// # use smol::{io, block_on};
2597    /// #
2598    /// # block_on(async {
2599    /// let mut conn = Connection::default().await?;
2600    /// conn.pipeline().append(b"key", 0, 0, false, b"value");
2601    /// # Ok::<(), io::Error>(())
2602    /// # }).unwrap()
2603    /// ```
2604    pub fn append(
2605        mut self,
2606        key: impl AsRef<[u8]>,
2607        flags: u32,
2608        exptime: i64,
2609        noreply: bool,
2610        data_block: impl AsRef<[u8]>,
2611    ) -> Self {
2612        self.1.push(build_storage_cmd(
2613            b"append",
2614            key.as_ref(),
2615            flags,
2616            exptime,
2617            None,
2618            noreply,
2619            data_block.as_ref(),
2620        ));
2621        self
2622    }
2623
2624    /// # Example
2625    ///
2626    /// ```rust
2627    /// use mcmc_rs::Connection;
2628    /// # use smol::{io, block_on};
2629    /// #
2630    /// # block_on(async {
2631    /// let mut conn = Connection::default().await?;
2632    /// conn.pipeline().prepend(b"key", 0, 0, false, b"value");
2633    /// # Ok::<(), io::Error>(())
2634    /// # }).unwrap()
2635    /// ```
2636    pub fn prepend(
2637        mut self,
2638        key: impl AsRef<[u8]>,
2639        flags: u32,
2640        exptime: i64,
2641        noreply: bool,
2642        data_block: impl AsRef<[u8]>,
2643    ) -> Self {
2644        self.1.push(build_storage_cmd(
2645            b"prepend",
2646            key.as_ref(),
2647            flags,
2648            exptime,
2649            None,
2650            noreply,
2651            data_block.as_ref(),
2652        ));
2653        self
2654    }
2655
2656    /// # Example
2657    ///
2658    /// ```rust
2659    /// use mcmc_rs::Connection;
2660    /// # use smol::{io, block_on};
2661    /// #
2662    /// # block_on(async {
2663    /// let mut conn = Connection::default().await?;
2664    /// conn.pipeline().cas(b"key", 0, 0, 0, false, b"value");
2665    /// # Ok::<(), io::Error>(())
2666    /// # }).unwrap()
2667    /// ```
2668    pub fn cas(
2669        mut self,
2670        key: impl AsRef<[u8]>,
2671        flags: u32,
2672        exptime: i64,
2673        cas_unique: u64,
2674        noreply: bool,
2675        data_block: impl AsRef<[u8]>,
2676    ) -> Self {
2677        self.1.push(build_storage_cmd(
2678            b"cas",
2679            key.as_ref(),
2680            flags,
2681            exptime,
2682            Some(cas_unique),
2683            noreply,
2684            data_block.as_ref(),
2685        ));
2686        self
2687    }
2688
2689    /// # Example
2690    ///
2691    /// ```rust
2692    /// use mcmc_rs::Connection;
2693    /// # use smol::{io, block_on};
2694    /// #
2695    /// # block_on(async {
2696    /// let mut conn = Connection::default().await?;
2697    /// conn.pipeline().auth(b"username", b"password");
2698    /// # Ok::<(), io::Error>(())
2699    /// # }).unwrap()
2700    /// ```
2701    pub fn auth(mut self, username: impl AsRef<[u8]>, password: impl AsRef<[u8]>) -> Self {
2702        self.1
2703            .push(build_auth_cmd(username.as_ref(), password.as_ref()));
2704        self
2705    }
2706
2707    /// # Example
2708    ///
2709    /// ```rust
2710    /// use mcmc_rs::Connection;
2711    /// # use smol::{io, block_on};
2712    /// #
2713    /// # block_on(async {
2714    /// let mut conn = Connection::default().await?;
2715    /// conn.pipeline().delete(b"key", false);
2716    /// # Ok::<(), io::Error>(())
2717    /// # }).unwrap()
2718    /// ```
2719    pub fn delete(mut self, key: impl AsRef<[u8]>, noreply: bool) -> Self {
2720        self.1.push(build_delete_cmd(key.as_ref(), noreply));
2721        self
2722    }
2723
2724    /// # Example
2725    ///
2726    /// ```rust
2727    /// use mcmc_rs::Connection;
2728    /// # use smol::{io, block_on};
2729    /// #
2730    /// # block_on(async {
2731    /// let mut conn = Connection::default().await?;
2732    /// conn.pipeline().incr(b"key", 1, false);
2733    /// # Ok::<(), io::Error>(())
2734    /// # }).unwrap()
2735    /// ```
2736    pub fn incr(mut self, key: impl AsRef<[u8]>, value: u64, noreply: bool) -> Self {
2737        self.1
2738            .push(build_incr_decr_cmd(b"incr", key.as_ref(), value, noreply));
2739        self
2740    }
2741
2742    /// # Example
2743    ///
2744    /// ```rust
2745    /// use mcmc_rs::Connection;
2746    /// # use smol::{io, block_on};
2747    /// #
2748    /// # block_on(async {
2749    /// let mut conn = Connection::default().await?;
2750    /// conn.pipeline().decr(b"key", 1, false);
2751    /// # Ok::<(), io::Error>(())
2752    /// # }).unwrap()
2753    /// ```
2754    pub fn decr(mut self, key: impl AsRef<[u8]>, value: u64, noreply: bool) -> Self {
2755        self.1
2756            .push(build_incr_decr_cmd(b"decr", key.as_ref(), value, noreply));
2757        self
2758    }
2759
2760    /// # Example
2761    ///
2762    /// ```rust
2763    /// use mcmc_rs::Connection;
2764    /// # use smol::{io, block_on};
2765    /// #
2766    /// # block_on(async {
2767    /// let mut conn = Connection::default().await?;
2768    /// conn.pipeline().touch(b"key", 1, false);
2769    /// # Ok::<(), io::Error>(())
2770    /// # }).unwrap()
2771    /// ```
2772    pub fn touch(mut self, key: impl AsRef<[u8]>, exptime: i64, noreply: bool) -> Self {
2773        self.1.push(build_touch_cmd(key.as_ref(), exptime, noreply));
2774        self
2775    }
2776
2777    /// # Example
2778    ///
2779    /// ```rust
2780    /// use mcmc_rs::Connection;
2781    /// # use smol::{io, block_on};
2782    /// #
2783    /// # block_on(async {
2784    /// let mut conn = Connection::default().await?;
2785    /// conn.pipeline().get(b"key");
2786    /// # Ok::<(), io::Error>(())
2787    /// # }).unwrap()
2788    /// ```
2789    pub fn get(mut self, key: impl AsRef<[u8]>) -> Self {
2790        self.1
2791            .push(build_retrieval_cmd(b"get", None, &[key.as_ref()]));
2792        self
2793    }
2794
2795    /// # Example
2796    ///
2797    /// ```rust
2798    /// use mcmc_rs::Connection;
2799    /// # use smol::{io, block_on};
2800    /// #
2801    /// # block_on(async {
2802    /// let mut conn = Connection::default().await?;
2803    /// conn.pipeline().gets(b"key");
2804    /// # Ok::<(), io::Error>(())
2805    /// # }).unwrap()
2806    /// ```
2807    pub fn gets(mut self, key: impl AsRef<[u8]>) -> Self {
2808        self.1
2809            .push(build_retrieval_cmd(b"gets", None, &[key.as_ref()]));
2810        self
2811    }
2812
2813    /// # Example
2814    ///
2815    /// ```rust
2816    /// use mcmc_rs::Connection;
2817    /// # use smol::{io, block_on};
2818    /// #
2819    /// # block_on(async {
2820    /// let mut conn = Connection::default().await?;
2821    /// conn.pipeline().gat(0, b"key");
2822    /// # Ok::<(), io::Error>(())
2823    /// # }).unwrap()
2824    /// ```
2825    pub fn gat(mut self, exptime: i64, key: impl AsRef<[u8]>) -> Self {
2826        self.1
2827            .push(build_retrieval_cmd(b"gat", Some(exptime), &[key.as_ref()]));
2828        self
2829    }
2830
2831    /// # Example
2832    ///
2833    /// ```rust
2834    /// use mcmc_rs::Connection;
2835    /// # use smol::{io, block_on};
2836    /// #
2837    /// # block_on(async {
2838    /// let mut conn = Connection::default().await?;
2839    /// conn.pipeline().gats(0, b"key");
2840    /// # Ok::<(), io::Error>(())
2841    /// # }).unwrap()
2842    /// ```
2843    pub fn gats(mut self, exptime: i64, key: impl AsRef<[u8]>) -> Self {
2844        self.1
2845            .push(build_retrieval_cmd(b"gats", Some(exptime), &[key.as_ref()]));
2846        self
2847    }
2848
2849    /// # Example
2850    ///
2851    /// ```rust
2852    /// use mcmc_rs::Connection;
2853    /// # use smol::{io, block_on};
2854    /// #
2855    /// # block_on(async {
2856    /// let mut conn = Connection::default().await?;
2857    /// conn.pipeline().get_multi(&[b"key".as_slice(), b"key2".as_slice()]);
2858    /// # Ok::<(), io::Error>(())
2859    /// # }).unwrap()
2860    /// ```
2861    pub fn get_multi(mut self, keys: &[impl AsRef<[u8]>]) -> Self {
2862        self.1.push(build_retrieval_cmd(
2863            b"get",
2864            None,
2865            &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
2866        ));
2867        self
2868    }
2869
2870    /// # Example
2871    ///
2872    /// ```rust
2873    /// use mcmc_rs::Connection;
2874    /// # use smol::{io, block_on};
2875    /// #
2876    /// # block_on(async {
2877    /// let mut conn = Connection::default().await?;
2878    /// conn.pipeline().gets_multi(&[b"key".as_slice(), b"key2".as_slice()]);
2879    /// # Ok::<(), io::Error>(())
2880    /// # }).unwrap()
2881    /// ```
2882    pub fn gets_multi(mut self, keys: &[impl AsRef<[u8]>]) -> Self {
2883        self.1.push(build_retrieval_cmd(
2884            b"gets",
2885            None,
2886            &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
2887        ));
2888        self
2889    }
2890
2891    /// # Example
2892    ///
2893    /// ```rust
2894    /// use mcmc_rs::Connection;
2895    /// # use smol::{io, block_on};
2896    /// #
2897    /// # block_on(async {
2898    /// let mut conn = Connection::default().await?;
2899    /// conn.pipeline().gat_multi(0, &[b"key".as_slice(), b"key2".as_slice()]);
2900    /// # Ok::<(), io::Error>(())
2901    /// # }).unwrap()
2902    /// ```
2903    pub fn gat_multi(mut self, exptime: i64, keys: &[impl AsRef<[u8]>]) -> Self {
2904        self.1.push(build_retrieval_cmd(
2905            b"gat",
2906            Some(exptime),
2907            &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
2908        ));
2909        self
2910    }
2911
2912    /// # Example
2913    ///
2914    /// ```rust
2915    /// use mcmc_rs::Connection;
2916    /// # use smol::{io, block_on};
2917    /// #
2918    /// # block_on(async {
2919    /// let mut conn = Connection::default().await?;
2920    /// conn.pipeline().gats_multi(0, &[b"key".as_slice(), b"key2".as_slice()]);
2921    /// # Ok::<(), io::Error>(())
2922    /// # }).unwrap()
2923    /// ```
2924    pub fn gats_multi(mut self, exptime: i64, keys: &[impl AsRef<[u8]>]) -> Self {
2925        self.1.push(build_retrieval_cmd(
2926            b"gats",
2927            Some(exptime),
2928            &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
2929        ));
2930        self
2931    }
2932
2933    /// # Example
2934    ///
2935    /// ```rust
2936    /// use mcmc_rs::{Connection, StatsArg};
2937    /// # use smol::{io, block_on};
2938    /// #
2939    /// # block_on(async {
2940    /// let mut conn = Connection::default().await?;
2941    /// conn.pipeline().stats(StatsArg::Empty);
2942    /// # Ok::<(), io::Error>(())
2943    /// # }).unwrap()
2944    /// ```
2945    pub fn stats(mut self, arg: StatsArg) -> Self {
2946        self.1.push(build_stats_cmd(arg));
2947        self
2948    }
2949
2950    /// # Example
2951    ///
2952    /// ```rust
2953    /// use mcmc_rs::{Connection, SlabsAutomoveArg};
2954    /// # use smol::{io, block_on};
2955    /// #
2956    /// # block_on(async {
2957    /// let mut conn = Connection::default().await?;
2958    /// conn.pipeline().slabs_automove(SlabsAutomoveArg::Zero);
2959    /// # Ok::<(), io::Error>(())
2960    /// # }).unwrap()
2961    /// ```
2962    pub fn slabs_automove(mut self, arg: SlabsAutomoveArg) -> Self {
2963        self.1.push(build_slabs_automove_cmd(arg));
2964        self
2965    }
2966
2967    /// # Example
2968    ///
2969    /// ```rust
2970    /// use mcmc_rs::{Connection, LruCrawlerArg};
2971    /// # use smol::{io, block_on};
2972    /// #
2973    /// # block_on(async {
2974    /// let mut conn = Connection::default().await?;
2975    /// conn.pipeline().lru_crawler(LruCrawlerArg::Enable);
2976    /// # Ok::<(), io::Error>(())
2977    /// # }).unwrap()
2978    /// ```
2979    pub fn lru_crawler(mut self, arg: LruCrawlerArg) -> Self {
2980        self.1.push(build_lru_crawler_cmd(arg).to_vec());
2981        self
2982    }
2983
2984    /// # Example
2985    ///
2986    /// ```rust
2987    /// use mcmc_rs::Connection;
2988    /// # use smol::{io, block_on};
2989    /// #
2990    /// # block_on(async {
2991    /// let mut conn = Connection::default().await?;
2992    /// conn.pipeline().lru_crawler_sleep(0);
2993    /// # Ok::<(), io::Error>(())
2994    /// # }).unwrap()
2995    /// ```
2996    pub fn lru_crawler_sleep(mut self, microseconds: usize) -> Self {
2997        self.1.push(build_lru_clawler_sleep_cmd(microseconds));
2998        self
2999    }
3000
3001    /// # Example
3002    ///
3003    /// ```rust
3004    /// use mcmc_rs::Connection;
3005    /// # use smol::{io, block_on};
3006    /// #
3007    /// # block_on(async {
3008    /// let mut conn = Connection::default().await?;
3009    /// conn.pipeline().lru_crawler_tocrawl(0);
3010    /// # Ok::<(), io::Error>(())
3011    /// # }).unwrap()
3012    /// ```
3013    pub fn lru_crawler_tocrawl(mut self, arg: u32) -> Self {
3014        self.1.push(build_lru_crawler_tocrawl_cmd(arg));
3015        self
3016    }
3017
3018    /// # Example
3019    ///
3020    /// ```rust
3021    /// use mcmc_rs::{Connection, LruCrawlerCrawlArg};
3022    /// # use smol::{io, block_on};
3023    /// #
3024    /// # block_on(async {
3025    /// let mut conn = Connection::default().await?;
3026    /// conn.pipeline().lru_crawler_crawl(LruCrawlerCrawlArg::All);
3027    /// # Ok::<(), io::Error>(())
3028    /// # }).unwrap()
3029    /// ```
3030    pub fn lru_crawler_crawl(mut self, arg: LruCrawlerCrawlArg<'_>) -> Self {
3031        self.1.push(build_lru_clawler_crawl_cmd(arg));
3032        self
3033    }
3034
3035    /// # Example
3036    ///
3037    /// ```rust
3038    /// use mcmc_rs::Connection;
3039    /// # use smol::{io, block_on};
3040    /// #
3041    /// # block_on(async {
3042    /// let mut conn = Connection::default().await?;
3043    /// conn.pipeline().slabs_reassign(1, 2);
3044    /// # Ok::<(), io::Error>(())
3045    /// # }).unwrap()
3046    /// ```
3047    pub fn slabs_reassign(mut self, source_class: usize, dest_class: usize) -> Self {
3048        self.1
3049            .push(build_slabs_reassign_cmd(source_class, dest_class));
3050        self
3051    }
3052
3053    /// # Example
3054    ///
3055    /// ```rust
3056    /// use mcmc_rs::{Connection, LruCrawlerMetadumpArg};
3057    /// # use smol::{io, block_on};
3058    /// #
3059    /// # block_on(async {
3060    /// let mut conn = Connection::default().await?;
3061    /// conn.pipeline().lru_crawler_metadump(LruCrawlerMetadumpArg::All);
3062    /// # Ok::<(), io::Error>(())
3063    /// # }).unwrap()
3064    /// ```
3065    pub fn lru_crawler_metadump(mut self, arg: LruCrawlerMetadumpArg<'_>) -> Self {
3066        self.1.push(build_lru_clawler_metadump_cmd(arg));
3067        self
3068    }
3069
3070    /// # Example
3071    ///
3072    /// ```rust
3073    /// use mcmc_rs::{Connection, LruCrawlerMgdumpArg};
3074    /// # use smol::{io, block_on};
3075    /// #
3076    /// # block_on(async {
3077    /// let mut conn = Connection::default().await?;
3078    /// conn.pipeline().lru_crawler_mgdump(LruCrawlerMgdumpArg::All);
3079    /// # Ok::<(), io::Error>(())
3080    /// # }).unwrap()
3081    /// ```
3082    pub fn lru_crawler_mgdump(mut self, arg: LruCrawlerMgdumpArg<'_>) -> Self {
3083        self.1.push(build_lru_clawler_mgdump_cmd(arg));
3084        self
3085    }
3086
3087    /// # Example
3088    ///
3089    /// ```rust
3090    /// use mcmc_rs::Connection;
3091    /// # use smol::{io, block_on};
3092    /// #
3093    /// # block_on(async {
3094    /// let mut conn = Connection::default().await?;
3095    /// conn.pipeline().mn();
3096    /// # Ok::<(), io::Error>(())
3097    /// # }).unwrap()
3098    /// ```
3099    pub fn mn(mut self) -> Self {
3100        self.1.push(build_mn_cmd().to_vec());
3101        self
3102    }
3103
3104    /// # Example
3105    ///
3106    /// ```rust
3107    /// use mcmc_rs::Connection;
3108    /// # use smol::{io, block_on};
3109    /// #
3110    /// # block_on(async {
3111    /// let mut conn = Connection::default().await?;
3112    /// conn.pipeline().me(b"key");
3113    /// # Ok::<(), io::Error>(())
3114    /// # }).unwrap()
3115    /// ```
3116    pub fn me(mut self, key: impl AsRef<[u8]>) -> Self {
3117        self.1.push(build_me_cmd(key.as_ref()));
3118        self
3119    }
3120}
3121
3122#[cfg(test)]
3123mod tests {
3124    use super::*;
3125    use smol::{block_on, io::Cursor};
3126
3127    #[test]
3128    fn test_version() {
3129        block_on(async {
3130            let mut c = Cursor::new(b"version\r\nVERSION 1.2.3\r\n".to_vec());
3131            assert_eq!("1.2.3", version_cmd(&mut c).await.unwrap());
3132
3133            let mut c = Cursor::new(b"version\r\nERROR\r\n".to_vec());
3134            assert!(version_cmd(&mut c).await.is_err())
3135        })
3136    }
3137
3138    #[test]
3139    fn test_quit() {
3140        block_on(async {
3141            let mut c = Cursor::new(b"quit\r\n".to_vec());
3142            assert!(quit_cmd(&mut c).await.is_ok())
3143        })
3144    }
3145
3146    #[test]
3147    fn test_shutdown() {
3148        block_on(async {
3149            let mut c = Cursor::new(b"shutdown\r\n".to_vec());
3150            assert!(shutdown_cmd(&mut c, false).await.is_ok());
3151
3152            let mut c = Cursor::new(b"shutdown graceful\r\n".to_vec());
3153            assert!(shutdown_cmd(&mut c, true).await.is_ok())
3154        })
3155    }
3156
3157    #[test]
3158    fn test_cache_memlimit() {
3159        block_on(async {
3160            let mut c = Cursor::new(b"cache_memlimit 1\r\nOK\r\n".to_vec());
3161            assert!(cache_memlimit_cmd(&mut c, 1, false).await.is_ok());
3162
3163            let mut c = Cursor::new(b"cache_memlimit 1 noreply\r\n".to_vec());
3164            assert!(cache_memlimit_cmd(&mut c, 1, true).await.is_ok());
3165
3166            let mut c = Cursor::new(b"cache_memlimit 1\r\nERROR\r\n".to_vec());
3167            assert!(cache_memlimit_cmd(&mut c, 1, false).await.is_err());
3168        })
3169    }
3170
3171    #[test]
3172    fn test_flush_all() {
3173        block_on(async {
3174            let mut c = Cursor::new(b"flush_all\r\nOK\r\n".to_vec());
3175            assert!(flush_all_cmd(&mut c, None, false).await.is_ok());
3176
3177            let mut c = Cursor::new(b"flush_all 1 noreply\r\n".to_vec());
3178            assert!(flush_all_cmd(&mut c, Some(1), true).await.is_ok());
3179
3180            let mut c = Cursor::new(b"flush_all\r\nERROR\r\n".to_vec());
3181            assert!(flush_all_cmd(&mut c, None, false).await.is_err());
3182        })
3183    }
3184
3185    #[test]
3186    fn test_storage() {
3187        block_on(async {
3188            let mut c = Cursor::new(b"cas key 0 0 0 0\r\nvalue\r\nSTORED\r\n".to_vec());
3189            assert!(
3190                storage_cmd(&mut c, b"cas", b"key", 0, 0, Some(0), false, b"value")
3191                    .await
3192                    .unwrap()
3193            );
3194
3195            let mut c = Cursor::new(b"append key 0 0 0 noreply\r\nvalue\r\n".to_vec());
3196            assert!(
3197                storage_cmd(&mut c, b"append", b"key", 0, 0, None, true, b"value")
3198                    .await
3199                    .unwrap()
3200            );
3201
3202            let mut c = Cursor::new(b"prepend key 0 0 0\r\nvalue\r\nNOT_STORED\r\n".to_vec());
3203            assert!(
3204                !storage_cmd(&mut c, b"prepend", b"key", 0, 0, None, false, b"value")
3205                    .await
3206                    .unwrap()
3207            );
3208
3209            let mut c = Cursor::new(b"add key 0 0 0\r\nvalue\r\nERROR\r\n".to_vec());
3210            assert!(
3211                storage_cmd(&mut c, b"add", b"key", 0, 0, None, false, b"value")
3212                    .await
3213                    .is_err()
3214            )
3215        })
3216    }
3217
3218    #[test]
3219    fn test_delete() {
3220        block_on(async {
3221            let mut c = Cursor::new(b"delete key\r\nDELETED\r\n".to_vec());
3222            assert!(delete_cmd(&mut c, b"key", false).await.unwrap());
3223
3224            let mut c = Cursor::new(b"delete key\r\nNOT_FOUND\r\n".to_vec());
3225            assert!(!delete_cmd(&mut c, b"key", false).await.unwrap());
3226
3227            let mut c = Cursor::new(b"delete key noreply\r\n".to_vec());
3228            assert!(delete_cmd(&mut c, b"key", true).await.unwrap());
3229
3230            let mut c = Cursor::new(b"delete key\r\nERROR\r\n".to_vec());
3231            assert!(delete_cmd(&mut c, b"key", false).await.is_err());
3232        })
3233    }
3234
3235    #[test]
3236    fn test_auth() {
3237        block_on(async {
3238            let mut c = Cursor::new(b"set _ _ _ 3\r\na b\r\nSTORED\r\n".to_vec());
3239            assert!(auth_cmd(&mut c, b"a", b"b").await.is_ok());
3240
3241            let mut c = Cursor::new(b"set _ _ _ 3\r\na b\r\nERROR\r\n".to_vec());
3242            assert!(auth_cmd(&mut c, b"a", b"b").await.is_err());
3243        })
3244    }
3245
3246    #[test]
3247    fn test_incr_decr() {
3248        block_on(async {
3249            let mut c = Cursor::new(b"incr key 1\r\n2\r\n".to_vec());
3250            assert_eq!(
3251                incr_decr_cmd(&mut c, b"incr", b"key", 1, false)
3252                    .await
3253                    .unwrap(),
3254                Some(2)
3255            );
3256
3257            let mut c = Cursor::new(b"incr key 1 noreply\r\n".to_vec());
3258            assert!(
3259                incr_decr_cmd(&mut c, b"incr", b"key", 1, true)
3260                    .await
3261                    .unwrap()
3262                    .is_none(),
3263            );
3264
3265            let mut c = Cursor::new(b"incr key 1\r\nNOT_FOUND\r\n".to_vec());
3266            assert!(
3267                incr_decr_cmd(&mut c, b"incr", b"key", 1, false)
3268                    .await
3269                    .unwrap()
3270                    .is_none()
3271            );
3272
3273            let mut c = Cursor::new(b"incr key 1\r\nERROR\r\n".to_vec());
3274            assert!(
3275                incr_decr_cmd(&mut c, b"incr", b"key", 1, false)
3276                    .await
3277                    .is_err()
3278            );
3279        })
3280    }
3281
3282    #[test]
3283    fn test_touch() {
3284        block_on(async {
3285            let mut c = Cursor::new(b"touch key 0\r\nTOUCHED\r\n".to_vec());
3286            assert!(touch_cmd(&mut c, b"key", 0, false).await.unwrap());
3287
3288            let mut c = Cursor::new(b"touch key 0\r\nNOT_FOUND\r\n".to_vec());
3289            assert!(!touch_cmd(&mut c, b"key", 0, false).await.unwrap());
3290
3291            let mut c = Cursor::new(b"touch key 0 noreply\r\n".to_vec());
3292            assert!(touch_cmd(&mut c, b"key", 0, true).await.unwrap());
3293
3294            let mut c = Cursor::new(b"touch key 0\r\nERROR\r\n".to_vec());
3295            assert!(touch_cmd(&mut c, b"key", 0, false).await.is_err())
3296        })
3297    }
3298
3299    #[test]
3300    fn test_retrieval() {
3301        block_on(async {
3302            let mut c = Cursor::new(b"gets key\r\nEND\r\n".to_vec());
3303            assert_eq!(
3304                retrieval_cmd(&mut c, b"gets", None, &[b"key"])
3305                    .await
3306                    .unwrap(),
3307                vec![]
3308            );
3309
3310            let mut c = Cursor::new(b"gat 0 key\r\nVALUE key 0 1\r\na\r\nEND\r\n".to_vec());
3311            assert_eq!(
3312                retrieval_cmd(&mut c, b"gat", Some(0), &[b"key"])
3313                    .await
3314                    .unwrap(),
3315                vec![Item {
3316                    key: "key".to_string(),
3317                    flags: 0,
3318                    cas_unique: None,
3319                    data_block: b"a".to_vec(),
3320                }]
3321            );
3322
3323            let mut c = Cursor::new(
3324                b"gats 0 key key2\r\nVALUE key 0 1 0\r\na\r\nVALUE key2 0 1 0\r\na\r\nEND\r\n"
3325                    .to_vec(),
3326            );
3327            assert_eq!(
3328                retrieval_cmd(&mut c, b"gats", Some(0), &[b"key", b"key2"])
3329                    .await
3330                    .unwrap(),
3331                vec![
3332                    Item {
3333                        key: "key".to_string(),
3334                        flags: 0,
3335                        cas_unique: Some(0),
3336                        data_block: b"a".to_vec()
3337                    },
3338                    Item {
3339                        key: "key2".to_string(),
3340                        flags: 0,
3341                        cas_unique: Some(0),
3342                        data_block: b"a".to_vec()
3343                    }
3344                ]
3345            );
3346
3347            let mut c = Cursor::new(b"get key\r\nERROR\r\n".to_vec());
3348            assert!(
3349                retrieval_cmd(&mut c, b"get", None, &[b"key"])
3350                    .await
3351                    .is_err()
3352            )
3353        })
3354    }
3355
3356    #[test]
3357    fn test_stats() {
3358        block_on(async {
3359            let mut c =
3360                Cursor::new(b"stats\r\nSTAT version 1.2.3\r\nSTAT threads 4\r\nEND\r\n".to_vec());
3361            assert_eq!(
3362                stats_cmd(&mut c, StatsArg::Empty).await.unwrap(),
3363                HashMap::from([
3364                    ("version".to_string(), "1.2.3".to_string()),
3365                    ("threads".to_string(), "4".to_string()),
3366                ])
3367            );
3368
3369            let mut c = Cursor::new(b"stats settings\r\nERROR\r\n".to_vec());
3370            assert!(stats_cmd(&mut c, StatsArg::Settings).await.is_err());
3371
3372            let mut c = Cursor::new(b"stats items\r\nERROR\r\n".to_vec());
3373            assert!(stats_cmd(&mut c, StatsArg::Items).await.is_err());
3374
3375            let mut c = Cursor::new(b"stats sizes\r\nERROR\r\n".to_vec());
3376            assert!(stats_cmd(&mut c, StatsArg::Sizes).await.is_err());
3377
3378            let mut c = Cursor::new(b"stats slabs\r\nERROR\r\n".to_vec());
3379            assert!(stats_cmd(&mut c, StatsArg::Slabs).await.is_err());
3380
3381            let mut c = Cursor::new(b"stats conns\r\nERROR\r\n".to_vec());
3382            assert!(stats_cmd(&mut c, StatsArg::Conns).await.is_err())
3383        })
3384    }
3385
3386    #[test]
3387    fn test_slabs_automove() {
3388        block_on(async {
3389            let mut c = Cursor::new(b"slabs automove 0\r\nOK\r\n".to_vec());
3390            assert!(
3391                slabs_automove_cmd(&mut c, SlabsAutomoveArg::Zero)
3392                    .await
3393                    .is_ok()
3394            );
3395
3396            let mut c = Cursor::new(b"slabs automove 1\r\nERROR\r\n".to_vec());
3397            assert!(
3398                slabs_automove_cmd(&mut c, SlabsAutomoveArg::One)
3399                    .await
3400                    .is_err()
3401            );
3402
3403            let mut c = Cursor::new(b"slabs automove 2\r\nERROR\r\n".to_vec());
3404            assert!(
3405                slabs_automove_cmd(&mut c, SlabsAutomoveArg::Two)
3406                    .await
3407                    .is_err()
3408            )
3409        })
3410    }
3411
3412    #[test]
3413    fn test_lru_crawler() {
3414        block_on(async {
3415            let mut c = Cursor::new(b"lru_crawler enable\r\nOK\r\n".to_vec());
3416            assert!(lru_crawler_cmd(&mut c, LruCrawlerArg::Enable).await.is_ok());
3417
3418            let mut c = Cursor::new(b"lru_crawler disable\r\nERROR\r\n".to_vec());
3419            assert!(
3420                lru_crawler_cmd(&mut c, LruCrawlerArg::Disable)
3421                    .await
3422                    .is_err()
3423            )
3424        })
3425    }
3426
3427    #[test]
3428    fn test_lru_crawler_sleep() {
3429        block_on(async {
3430            let mut c = Cursor::new(b"lru_crawler sleep 1000000\r\nOK\r\n".to_vec());
3431            assert!(lru_crawler_sleep_cmd(&mut c, 1_000_000).await.is_ok());
3432
3433            let mut c = Cursor::new(b"lru_crawler sleep 0\r\nERROR\r\n".to_vec());
3434            assert!(lru_crawler_sleep_cmd(&mut c, 0).await.is_err())
3435        })
3436    }
3437
3438    #[test]
3439    fn test_lru_crawler_tocrawl() {
3440        block_on(async {
3441            let mut c = Cursor::new(b"lru_crawler tocrawl 0\r\nOK\r\n".to_vec());
3442            assert!(lru_crawler_tocrawl_cmd(&mut c, 0).await.is_ok());
3443
3444            let mut c = Cursor::new(b"lru_crawler tocrawl 0\r\nERROR\r\n".to_vec());
3445            assert!(lru_crawler_tocrawl_cmd(&mut c, 0).await.is_err())
3446        })
3447    }
3448
3449    #[test]
3450    fn test_lru_crawler_crawl() {
3451        block_on(async {
3452            let mut c = Cursor::new(b"lru_crawler crawl 1,2,3\r\nOK\r\n".to_vec());
3453            assert!(
3454                lru_crawler_crawl_cmd(&mut c, LruCrawlerCrawlArg::Classids(&[1, 2, 3]))
3455                    .await
3456                    .is_ok()
3457            );
3458
3459            let mut c = Cursor::new(b"lru_crawler crawl all\r\nERROR\r\n".to_vec());
3460            assert!(
3461                lru_crawler_crawl_cmd(&mut c, LruCrawlerCrawlArg::All)
3462                    .await
3463                    .is_err()
3464            )
3465        })
3466    }
3467
3468    #[test]
3469    fn test_slabs_reassign() {
3470        block_on(async {
3471            let mut c = Cursor::new(b"slabs reassign 1 10\r\nOK\r\n".to_vec());
3472            assert!(slabs_reassign_cmd(&mut c, 1, 10).await.is_ok());
3473
3474            let mut c = Cursor::new(b"slabs reassign 1 10\r\nERROR\r\n".to_vec());
3475            assert!(slabs_reassign_cmd(&mut c, 1, 10).await.is_err())
3476        })
3477    }
3478
3479    #[test]
3480    fn test_lru_crawler_metadump() {
3481        block_on(async {
3482            let mut c = Cursor::new(b"lru_crawler metadump all\r\nkey=key exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0\r\nkey=key2 exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0\r\nEND\r\n".to_vec());
3483            assert_eq!(
3484                lru_crawler_metadump_cmd(&mut c, LruCrawlerMetadumpArg::All)
3485                    .await
3486                    .unwrap(),
3487                [
3488                    "key=key exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0",
3489                    "key=key2 exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0"
3490                ]
3491            );
3492
3493            let mut c = Cursor::new(b"lru_crawler metadump 1,2,3\r\nERROR\r\n".to_vec());
3494            assert!(
3495                lru_crawler_metadump_cmd(&mut c, LruCrawlerMetadumpArg::Classids(&[1, 2, 3]))
3496                    .await
3497                    .is_err()
3498            );
3499
3500            let mut c = Cursor::new(b"lru_crawler metadump hash\r\nERROR\r\n".to_vec());
3501            assert!(
3502                lru_crawler_metadump_cmd(&mut c, LruCrawlerMetadumpArg::Hash)
3503                    .await
3504                    .is_err()
3505            )
3506        })
3507    }
3508
3509    #[test]
3510    fn test_lru_crawler_mgdump() {
3511        block_on(async {
3512            let mut c =
3513                Cursor::new(b"lru_crawler mgdump 3\r\nmg key\r\nmg key2\r\nEN\r\n".to_vec());
3514            assert_eq!(
3515                lru_crawler_mgdump_cmd(&mut c, LruCrawlerMgdumpArg::Classids(&[3]))
3516                    .await
3517                    .unwrap(),
3518                ["key", "key2"]
3519            );
3520
3521            let mut c = Cursor::new(b"lru_crawler mgdump all\r\nERROR\r\n".to_vec());
3522            assert!(
3523                lru_crawler_mgdump_cmd(&mut c, LruCrawlerMgdumpArg::All)
3524                    .await
3525                    .is_err()
3526            );
3527
3528            let mut c = Cursor::new(b"lru_crawler mgdump hash\r\nERROR\r\n".to_vec());
3529            assert!(
3530                lru_crawler_mgdump_cmd(&mut c, LruCrawlerMgdumpArg::Hash)
3531                    .await
3532                    .is_err()
3533            )
3534        })
3535    }
3536
3537    #[test]
3538    fn test_mn() {
3539        block_on(async {
3540            let mut c = Cursor::new(b"mn\r\nMN\r\n".to_vec());
3541            assert!(mn_cmd(&mut c).await.is_ok());
3542
3543            let mut c = Cursor::new(b"mn\r\nERROR\r\n".to_vec());
3544            assert!(mn_cmd(&mut c).await.is_err())
3545        })
3546    }
3547
3548    #[test]
3549    fn test_me() {
3550        block_on(async {
3551            let mut c = Cursor::new(b"me key\r\nEN\r\n".to_vec());
3552            assert!(me_cmd(&mut c, b"key").await.unwrap().is_none());
3553
3554            let mut c = Cursor::new(
3555                b"me key\r\nME key exp=-1 la=3 cas=2 fetch=no cls=1 size=63\r\n".to_vec(),
3556            );
3557            assert_eq!(
3558                me_cmd(&mut c, b"key").await.unwrap().unwrap(),
3559                "key exp=-1 la=3 cas=2 fetch=no cls=1 size=63"
3560            );
3561
3562            let mut c = Cursor::new(b"me key\r\nERROR\r\n".to_vec());
3563            assert!(me_cmd(&mut c, b"key").await.is_err());
3564        })
3565    }
3566
3567    #[test]
3568    fn test_pipeline() {
3569        block_on(async {
3570            let cmds = [
3571                b"version\r\n".to_vec(),
3572                b"quit\r\n".to_vec(),
3573                b"shutdown\r\n".to_vec(),
3574                b"cache_memlimit 1\r\n".to_vec(),
3575                b"cache_memlimit 1 noreply\r\n".to_vec(),
3576                b"flush_all\r\n".to_vec(),
3577                b"flush_all 1 noreply\r\n".to_vec(),
3578                b"cas key 0 0 5 0\r\nvalue\r\n".to_vec(),
3579                b"append key 0 0 5 noreply\r\nvalue\r\n".to_vec(),
3580                b"delete key\r\n".to_vec(),
3581                b"delete key noreply\r\n".to_vec(),
3582                b"set _ _ _ 3\r\na b\r\n".to_vec(),
3583                b"incr key 1\r\n".to_vec(),
3584                b"incr key 1 noreply\r\n".to_vec(),
3585                b"touch key 0\r\n".to_vec(),
3586                b"touch key 0 noreply\r\n".to_vec(),
3587                b"gets key\r\n".to_vec(),
3588                b"get key key2\r\n".to_vec(),
3589                b"gat 0 key key2\r\n".to_vec(),
3590                b"gats 0 key\r\n".to_vec(),
3591                b"stats\r\n".to_vec(),
3592                b"slabs automove 0\r\n".to_vec(),
3593                b"lru_crawler enable\r\n".to_vec(),
3594                b"lru_crawler disable\r\n".to_vec(),
3595                b"lru_crawler sleep 1000000\r\n".to_vec(),
3596                b"lru_crawler tocrawl 0\r\n".to_vec(),
3597                b"lru_crawler crawl 1,2,3\r\n".to_vec(),
3598                b"slabs reassign 1 10\r\n".to_vec(),
3599                b"lru_crawler metadump all\r\n".to_vec(),
3600                b"lru_crawler mgdump 3\r\n".to_vec(),
3601                b"mn\r\n".to_vec(),
3602                b"me key\r\n".to_vec(),
3603            ];
3604            let rps = [
3605                b"VERSION 1.2.3\r\n".to_vec(),
3606                b"OK\r\n".to_vec(),
3607                b"OK\r\n".to_vec(),
3608                b"STORED\r\n".to_vec(),
3609                b"DELETED\r\n".to_vec(),
3610                b"STORED\r\n".to_vec(),
3611                b"2\r\n".to_vec(),
3612                b"TOUCHED\r\n".to_vec(),
3613                b"END\r\n".to_vec(),
3614                b"END\r\n".to_vec(),
3615                b"VALUE key 0 1 0\r\na\r\nVALUE key2 0 1 0\r\na\r\nEND\r\n".to_vec(),
3616                b"VALUE key 0 1 0\r\na\r\nEND\r\n".to_vec(),
3617                b"STAT version 1.2.3\r\nSTAT threads 4\r\nEND\r\n".to_vec(),
3618                b"OK\r\n".to_vec(),
3619                b"OK\r\n".to_vec(),
3620                b"OK\r\n".to_vec(),
3621                b"OK\r\n".to_vec(),
3622                b"OK\r\n".to_vec(),
3623                b"OK\r\n".to_vec(),
3624                b"OK\r\n".to_vec(),
3625                b"key=key exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0\r\nkey=key2 exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0\r\nEND\r\n".to_vec(),
3626                b"mg key\r\nmg key2\r\nEN\r\n".to_vec(),
3627                b"MN\r\n".to_vec(),
3628                b"ME key exp=-1 la=3 cas=2 fetch=no cls=1 size=63\r\n".to_vec(),
3629            ];
3630            let mut c = Cursor::new([cmds.concat(), rps.concat()].concat().to_vec());
3631            assert_eq!(
3632                execute_cmd(&mut c, &cmds).await.unwrap(),
3633                [
3634                    PipelineResponse::String("1.2.3".to_string()),
3635                    PipelineResponse::Unit(()),
3636                    PipelineResponse::Unit(()),
3637                    PipelineResponse::Unit(()),
3638                    PipelineResponse::Unit(()),
3639                    PipelineResponse::Unit(()),
3640                    PipelineResponse::Unit(()),
3641                    PipelineResponse::Bool(true),
3642                    PipelineResponse::Bool(true),
3643                    PipelineResponse::Bool(true),
3644                    PipelineResponse::Bool(true),
3645                    PipelineResponse::Unit(()),
3646                    PipelineResponse::Value(Some(2)),
3647                    PipelineResponse::Value(None),
3648                    PipelineResponse::Bool(true),
3649                    PipelineResponse::Bool(true),
3650                    PipelineResponse::Item(Item {
3651                        key: "key".to_string(),
3652                        flags: 0,
3653                        cas_unique: Some(0),
3654                        data_block: b"a".to_vec()
3655                    }),
3656                    PipelineResponse::Item(Item {
3657                        key: "key2".to_string(),
3658                        flags: 0,
3659                        cas_unique: Some(0),
3660                        data_block: b"a".to_vec()
3661                    }),
3662                    PipelineResponse::Item(Item {
3663                        key: "key".to_string(),
3664                        flags: 0,
3665                        cas_unique: Some(0),
3666                        data_block: b"a".to_vec()
3667                    }),
3668                    PipelineResponse::HashMap(HashMap::from([
3669                        ("threads".to_string(), "4".to_string()),
3670                        ("version".to_string(), "1.2.3".to_string())
3671                    ])),
3672                    PipelineResponse::Unit(()),
3673                    PipelineResponse::Unit(()),
3674                    PipelineResponse::Unit(()),
3675                    PipelineResponse::Unit(()),
3676                    PipelineResponse::Unit(()),
3677                    PipelineResponse::Unit(()),
3678                    PipelineResponse::Unit(()),
3679                    PipelineResponse::VecString(vec![
3680                        "key=key exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0"
3681                            .to_string(),
3682                        "key=key2 exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0"
3683                            .to_string()
3684                    ]),
3685                    PipelineResponse::VecString(vec!["key".to_string(), "key2".to_string()]),
3686                    PipelineResponse::Unit(()),
3687                    PipelineResponse::OptionString(Some(
3688                        "key exp=-1 la=3 cas=2 fetch=no cls=1 size=63".to_string()
3689                    ))
3690                ]
3691            );
3692
3693            let cmds = [b"version\r\n".to_vec(), b"quit\r\n".to_vec()];
3694            let rps = [b"ERROR\r\n".to_vec(), b"OK\r\n".to_vec()];
3695            let mut c = Cursor::new([cmds.concat(), rps.concat()].concat().to_vec());
3696            assert!(execute_cmd(&mut c, &cmds).await.is_err());
3697        })
3698    }
3699}