1use std::collections::HashMap;
2
3use crc32fast;
4use smol::io::{self, BufReader};
5use smol::net::{TcpStream, UdpSocket, unix::UnixStream};
6use smol::prelude::*;
7
8pub enum StatsArg {
9 Empty,
10 Settings,
11 Items,
12 Sizes,
13 Slabs,
14 Conns,
15}
16
17pub enum SlabsAutomoveArg {
18 Zero,
19 One,
20 Two,
21}
22
23pub enum LruCrawlerArg {
24 Enable,
25 Disable,
26}
27
28pub enum LruCrawlerCrawlArg<'a> {
29 Classids(&'a [usize]),
30 All,
31}
32
33pub enum LruCrawlerMetadumpArg<'a> {
34 Classids(&'a [usize]),
35 All,
36 Hash,
37}
38
39pub enum LruCrawlerMgdumpArg<'a> {
40 Classids(&'a [usize]),
41 All,
42 Hash,
43}
44
45#[derive(Debug, PartialEq)]
46pub struct Item {
47 pub key: String,
48 pub flags: u32,
49 pub cas_unique: Option<u64>,
50 pub data_block: Vec<u8>,
51}
52
53#[derive(Debug, PartialEq)]
54pub enum PipelineResponse {
55 Bool(bool),
56 Item(Item),
57 String(String),
58 OptionString(Option<String>),
59 VecString(Vec<String>),
60 Unit(()),
61 Value(Option<u64>),
62 HashMap(HashMap<String, String>),
63}
64
65async fn parse_storage_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
66 s: &mut S,
67 noreply: bool,
68) -> io::Result<bool> {
69 if noreply {
70 return Ok(true);
71 };
72 let mut line = String::new();
73 s.read_line(&mut line).await?;
74 match line.as_str() {
75 "STORED\r\n" => Ok(true),
76 "NOT_STORED\r\n" | "EXISTS\r\n" | "NOT_FOUND\r\n" => Ok(false),
77 _ => Err(io::Error::other(line)),
78 }
79}
80
81async fn parse_retrieval_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
82 s: &mut S,
83) -> io::Result<Vec<Item>> {
84 let mut items = Vec::new();
85 let mut line = String::new();
86 s.read_line(&mut line).await?;
87 while line.starts_with("VALUE") {
88 let mut split = line.split(' ');
89 split.next();
90 let (key, flags, bytes, cas_unique) = (
91 split.next().unwrap().to_string(),
92 split.next().unwrap().parse::<u32>().unwrap(),
93 split.next().unwrap().trim_end().parse::<usize>().unwrap(),
94 split.next().map(|x| x.trim_end().parse::<u64>().unwrap()),
95 );
96 let mut data_block = vec![0; bytes];
97 s.read_exact(&mut data_block).await?;
98 s.read_line(&mut String::new()).await?;
99 items.push(Item {
100 key,
101 flags,
102 cas_unique,
103 data_block,
104 });
105 line.clear();
106 s.read_line(&mut line).await?;
107 }
108 if line == "END\r\n" {
109 Ok(items)
110 } else {
111 Err(io::Error::other(line))
112 }
113}
114
115async fn parse_version_rp<S: AsyncBufRead + AsyncWrite + Unpin>(s: &mut S) -> io::Result<String> {
116 let mut line = String::new();
117 let n = s.read_line(&mut line).await?;
118 if line.starts_with("VERSION") {
119 Ok(line[8..n - 2].to_string())
120 } else {
121 Err(io::Error::other(line))
122 }
123}
124
125async fn parse_ok_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
126 s: &mut S,
127 noreply: bool,
128) -> io::Result<()> {
129 if noreply {
130 return Ok(());
131 };
132 let mut line = String::new();
133 s.read_line(&mut line).await?;
134 if line == "OK\r\n" {
135 Ok(())
136 } else {
137 Err(io::Error::other(line))
138 }
139}
140
141async fn parse_delete_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
142 s: &mut S,
143 noreply: bool,
144) -> io::Result<bool> {
145 if noreply {
146 return Ok(true);
147 };
148 let mut line = String::new();
149 s.read_line(&mut line).await?;
150 match line.as_str() {
151 "DELETED\r\n" => Ok(true),
152 "NOT_FOUND\r\n" => Ok(false),
153 _ => Err(io::Error::other(line)),
154 }
155}
156
157async fn parse_auth_rp<S: AsyncBufRead + AsyncWrite + Unpin>(s: &mut S) -> io::Result<()> {
158 let mut line = String::new();
159 s.read_line(&mut line).await?;
160 match line.as_str() {
161 "STORED\r\n" => Ok(()),
162 _ => Err(io::Error::other(line)),
163 }
164}
165
166async fn parse_incr_decr_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
167 s: &mut S,
168 noreply: bool,
169) -> io::Result<Option<u64>> {
170 if noreply {
171 return Ok(None);
172 };
173 let mut line = String::new();
174 s.read_line(&mut line).await?;
175 if line == "NOT_FOUND\r\n" {
176 return Ok(None);
177 };
178 match line.trim_end().parse::<u64>() {
179 Ok(v) => Ok(Some(v)),
180 Err(_) => Err(io::Error::other(line)),
181 }
182}
183
184async fn parse_touch_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
185 s: &mut S,
186 noreply: bool,
187) -> io::Result<bool> {
188 if noreply {
189 return Ok(true);
190 };
191 let mut line = String::new();
192 s.read_line(&mut line).await?;
193 if line == "TOUCHED\r\n" {
194 Ok(true)
195 } else if line == "NOT_FOUND\r\n" {
196 Ok(false)
197 } else {
198 Err(io::Error::other(line))
199 }
200}
201
202async fn parse_stats_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
203 s: &mut S,
204) -> io::Result<HashMap<String, String>> {
205 let mut lines = s.lines();
206 let mut items = HashMap::new();
207 while let Some(line) = lines.next().await {
208 let data = line?;
209 if data.starts_with("STAT") {
210 let mut split = data.split(' ');
211 split.next();
212 let (k, v) = (
213 split.next().unwrap().to_string(),
214 split.next().unwrap().trim_end().to_string(),
215 );
216 items.insert(k, v);
217 } else if data == "END" {
218 break;
219 } else {
220 return Err(io::Error::other(data));
221 };
222 }
223 Ok(items)
224}
225
226async fn parse_lru_crawler_metadump_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
227 s: &mut S,
228) -> io::Result<Vec<String>> {
229 let mut line = String::new();
230 s.read_line(&mut line).await?;
231 let mut items = Vec::new();
232 while line.starts_with("key=") {
233 items.push(line.trim_end().to_string());
234 line.clear();
235 s.read_line(&mut line).await?;
236 }
237 if line == "END\r\n" {
238 Ok(items)
239 } else {
240 Err(io::Error::other(line))
241 }
242}
243
244async fn parse_lru_crawler_mgdump_rp<S: AsyncBufRead + AsyncWrite + Unpin>(
245 s: &mut S,
246) -> io::Result<Vec<String>> {
247 let mut line = String::new();
248 s.read_line(&mut line).await?;
249 let mut items = Vec::new();
250 while line.starts_with("mg ") {
251 let mut split = line.split(' ');
252 split.next();
253 items.push(split.next().unwrap().trim_end().to_string());
254 line.clear();
255 s.read_line(&mut line).await?;
256 }
257 if line == "EN\r\n" {
258 Ok(items)
259 } else {
260 Err(io::Error::other(line))
261 }
262}
263
264async fn parse_mn_rp<S: AsyncBufRead + AsyncWrite + Unpin>(s: &mut S) -> io::Result<()> {
265 let mut line = String::new();
266 s.read_line(&mut line).await?;
267 if line == "MN\r\n" {
268 Ok(())
269 } else {
270 Err(io::Error::other(line))
271 }
272}
273
274async fn parse_me_r<S: AsyncBufRead + AsyncWrite + Unpin>(s: &mut S) -> io::Result<Option<String>> {
275 let mut line = String::new();
276 s.read_line(&mut line).await?;
277 if line == "EN\r\n" {
278 Ok(None)
279 } else if line.starts_with("ME") {
280 Ok(Some(line[3..line.len() - 2].to_string()))
281 } else {
282 Err(io::Error::other(line))
283 }
284}
285
286fn build_storage_cmd(
287 command_name: &[u8],
288 key: &[u8],
289 flags: u32,
290 exptime: i64,
291 cas_unique: Option<u64>,
292 noreply: bool,
293 data_block: &[u8],
294) -> Vec<u8> {
295 let n: &[u8] = if noreply { b" noreply" } else { b"" };
296 let cas = match cas_unique {
297 Some(x) => format!(" {x}"),
298 None => "".to_string(),
299 };
300 [
301 command_name,
302 b" ",
303 key,
304 b" ",
305 flags.to_string().as_bytes(),
306 b" ",
307 exptime.to_string().as_bytes(),
308 b" ",
309 data_block.len().to_string().as_bytes(),
310 cas.as_bytes(),
311 n,
312 b"\r\n",
313 data_block,
314 b"\r\n",
315 ]
316 .concat()
317}
318
319fn build_retrieval_cmd(command_name: &[u8], exptime: Option<i64>, keys: &[&[u8]]) -> Vec<u8> {
320 let t = match exptime {
321 Some(x) => format!("{x} "),
322 None => "".to_string(),
323 };
324 [
325 command_name,
326 b" ",
327 t.as_bytes(),
328 keys.join(b" ".as_slice()).as_slice(),
329 b"\r\n",
330 ]
331 .concat()
332}
333
334fn build_version_cmd() -> &'static [u8] {
335 b"version\r\n"
336}
337
338fn build_quit_cmd() -> &'static [u8] {
339 b"quit\r\n"
340}
341
342fn build_shutdown_cmd(graceful: bool) -> &'static [u8] {
343 if graceful {
344 b"shutdown graceful\r\n"
345 } else {
346 b"shutdown\r\n"
347 }
348}
349
350fn build_cache_memlimit_cmd(limit: usize, noreply: bool) -> Vec<u8> {
351 let n: &[u8] = if noreply { b" noreply" } else { b"" };
352 [b"cache_memlimit ", limit.to_string().as_bytes(), n, b"\r\n"].concat()
353}
354
355fn build_flush_all_cmd(exptime: Option<i64>, noreply: bool) -> Vec<u8> {
356 let d = match exptime {
357 Some(x) => format!(" {x}"),
358 None => "".to_string(),
359 };
360 let n: &[u8] = if noreply { b" noreply" } else { b"" };
361 [b"flush_all", d.as_bytes(), n, b"\r\n"].concat()
362}
363
364fn build_delete_cmd(key: &[u8], noreply: bool) -> Vec<u8> {
365 let n: &[u8] = if noreply { b" noreply" } else { b"" };
366 [b"delete ", key, n, b"\r\n"].concat()
367}
368
369fn build_auth_cmd(username: &[u8], password: &[u8]) -> Vec<u8> {
370 [
371 b"set _ _ _ ",
372 (username.len() + password.len() + 1).to_string().as_bytes(),
373 b"\r\n",
374 username,
375 b" ",
376 password,
377 b"\r\n",
378 ]
379 .concat()
380}
381
382fn build_incr_decr_cmd(command_name: &[u8], key: &[u8], value: u64, noreply: bool) -> Vec<u8> {
383 let n: &[u8] = if noreply { b" noreply" } else { b"" };
384 [
385 command_name,
386 b" ",
387 key,
388 b" ",
389 value.to_string().as_bytes(),
390 n,
391 b"\r\n",
392 ]
393 .concat()
394}
395
396fn build_touch_cmd(key: &[u8], exptime: i64, noreply: bool) -> Vec<u8> {
397 let n: &[u8] = if noreply { b" noreply" } else { b"" };
398 [
399 b"touch ",
400 key,
401 b" ",
402 exptime.to_string().as_bytes(),
403 n,
404 b"\r\n",
405 ]
406 .concat()
407}
408
409fn build_stats_cmd(arg: StatsArg) -> Vec<u8> {
410 let a: &[u8] = match arg {
411 StatsArg::Empty => b"",
412 StatsArg::Settings => b" settings",
413 StatsArg::Items => b" items",
414 StatsArg::Sizes => b" sizes",
415 StatsArg::Slabs => b" slabs",
416 StatsArg::Conns => b" conns",
417 };
418 [b"stats", a, b"\r\n"].concat()
419}
420
421fn build_slabs_automove_cmd(arg: SlabsAutomoveArg) -> Vec<u8> {
422 let a: &[u8] = match arg {
423 SlabsAutomoveArg::Zero => b"0",
424 SlabsAutomoveArg::One => b"1",
425 SlabsAutomoveArg::Two => b"2",
426 };
427 [b"slabs automove ", a, b"\r\n"].concat()
428}
429
430fn build_lru_crawler_cmd(arg: LruCrawlerArg) -> &'static [u8] {
431 match arg {
432 LruCrawlerArg::Enable => b"lru_crawler enable\r\n",
433 LruCrawlerArg::Disable => b"lru_crawler disable\r\n",
434 }
435}
436
437fn build_lru_clawler_sleep_cmd(microseconds: usize) -> Vec<u8> {
438 [
439 b"lru_crawler sleep ",
440 microseconds.to_string().as_bytes(),
441 b"\r\n",
442 ]
443 .concat()
444}
445
446fn build_lru_crawler_tocrawl_cmd(arg: u32) -> Vec<u8> {
447 [b"lru_crawler tocrawl ", arg.to_string().as_bytes(), b"\r\n"].concat()
448}
449
450fn build_lru_clawler_crawl_cmd(arg: LruCrawlerCrawlArg) -> Vec<u8> {
451 let a = match arg {
452 LruCrawlerCrawlArg::Classids(ids) => ids
453 .iter()
454 .map(|x| x.to_string())
455 .collect::<Vec<_>>()
456 .join(","),
457 LruCrawlerCrawlArg::All => "all".to_string(),
458 };
459 [b"lru_crawler crawl ", a.as_bytes(), b"\r\n"].concat()
460}
461
462fn build_slabs_reassign_cmd(source_class: usize, dest_class: usize) -> Vec<u8> {
463 [
464 b"slabs reassign ",
465 source_class.to_string().as_bytes(),
466 b" ",
467 dest_class.to_string().as_bytes(),
468 b"\r\n",
469 ]
470 .concat()
471}
472
473fn build_lru_clawler_metadump_cmd(arg: LruCrawlerMetadumpArg) -> Vec<u8> {
474 let a = match arg {
475 LruCrawlerMetadumpArg::Classids(ids) => ids
476 .iter()
477 .map(|x| x.to_string())
478 .collect::<Vec<_>>()
479 .join(","),
480 LruCrawlerMetadumpArg::All => "all".to_string(),
481 LruCrawlerMetadumpArg::Hash => "hash".to_string(),
482 };
483 [b"lru_crawler metadump ", a.as_bytes(), b"\r\n"].concat()
484}
485
486fn build_lru_clawler_mgdump_cmd(arg: LruCrawlerMgdumpArg) -> Vec<u8> {
487 let a = match arg {
488 LruCrawlerMgdumpArg::Classids(ids) => ids
489 .iter()
490 .map(|x| x.to_string())
491 .collect::<Vec<_>>()
492 .join(","),
493 LruCrawlerMgdumpArg::All => "all".to_string(),
494 LruCrawlerMgdumpArg::Hash => "hash".to_string(),
495 };
496 [b"lru_crawler mgdump ", a.as_bytes(), b"\r\n"].concat()
497}
498
499fn build_mn_cmd() -> &'static [u8] {
500 b"mn\r\n"
501}
502
503fn build_me_cmd(key: &[u8]) -> Vec<u8> {
504 [b"me ", key, b"\r\n"].concat()
505}
506
507async fn version_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(s: &mut S) -> io::Result<String> {
508 s.write_all(build_version_cmd()).await?;
509 s.flush().await?;
510 parse_version_rp(s).await
511}
512
513async fn quit_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(s: &mut S) -> io::Result<()> {
514 s.write_all(build_quit_cmd()).await?;
515 s.flush().await?;
516 Ok(())
517}
518
519async fn shutdown_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
520 s: &mut S,
521 graceful: bool,
522) -> io::Result<()> {
523 s.write_all(build_shutdown_cmd(graceful)).await?;
524 s.flush().await?;
525 Ok(())
526}
527
528async fn cache_memlimit_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
529 s: &mut S,
530 limit: usize,
531 noreply: bool,
532) -> io::Result<()> {
533 s.write_all(&build_cache_memlimit_cmd(limit, noreply))
534 .await?;
535 s.flush().await?;
536 parse_ok_rp(s, noreply).await
537}
538
539async fn flush_all_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
540 s: &mut S,
541 exptime: Option<i64>,
542 noreply: bool,
543) -> io::Result<()> {
544 s.write_all(&build_flush_all_cmd(exptime, noreply)).await?;
545 s.flush().await?;
546 parse_ok_rp(s, noreply).await
547}
548
549async fn storage_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
550 s: &mut S,
551 command_name: &[u8],
552 key: &[u8],
553 flags: u32,
554 exptime: i64,
555 cas_unique: Option<u64>,
556 noreply: bool,
557 data_block: &[u8],
558) -> io::Result<bool> {
559 s.write_all(&build_storage_cmd(
560 command_name,
561 key,
562 flags,
563 exptime,
564 cas_unique,
565 noreply,
566 data_block,
567 ))
568 .await?;
569 s.flush().await?;
570 parse_storage_rp(s, noreply).await
571}
572
573async fn delete_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
574 s: &mut S,
575 key: &[u8],
576 noreply: bool,
577) -> io::Result<bool> {
578 s.write_all(&build_delete_cmd(key, noreply)).await?;
579 s.flush().await?;
580 parse_delete_rp(s, noreply).await
581}
582
583async fn auth_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
584 s: &mut S,
585 username: &[u8],
586 password: &[u8],
587) -> io::Result<()> {
588 s.write_all(&build_auth_cmd(username, password)).await?;
589 s.flush().await?;
590 parse_auth_rp(s).await
591}
592
593async fn incr_decr_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
594 s: &mut S,
595 command_name: &[u8],
596 key: &[u8],
597 value: u64,
598 noreply: bool,
599) -> io::Result<Option<u64>> {
600 s.write_all(&build_incr_decr_cmd(command_name, key, value, noreply))
601 .await?;
602 s.flush().await?;
603 parse_incr_decr_rp(s, noreply).await
604}
605
606async fn touch_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
607 s: &mut S,
608 key: &[u8],
609 exptime: i64,
610 noreply: bool,
611) -> io::Result<bool> {
612 s.write_all(&build_touch_cmd(key, exptime, noreply)).await?;
613 s.flush().await?;
614 parse_touch_rp(s, noreply).await
615}
616
617async fn retrieval_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
618 s: &mut S,
619 command_name: &[u8],
620 exptime: Option<i64>,
621 keys: &[&[u8]],
622) -> io::Result<Vec<Item>> {
623 s.write_all(&build_retrieval_cmd(command_name, exptime, keys))
624 .await?;
625 s.flush().await?;
626 parse_retrieval_rp(s).await
627}
628
629async fn stats_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
630 s: &mut S,
631 arg: StatsArg,
632) -> io::Result<HashMap<String, String>> {
633 s.write_all(&build_stats_cmd(arg)).await?;
634 s.flush().await?;
635 parse_stats_rp(s).await
636}
637
638async fn slabs_automove_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
639 s: &mut S,
640 arg: SlabsAutomoveArg,
641) -> io::Result<()> {
642 s.write_all(&build_slabs_automove_cmd(arg)).await?;
643 s.flush().await?;
644 parse_ok_rp(s, false).await
645}
646
647async fn lru_crawler_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
648 s: &mut S,
649 arg: LruCrawlerArg,
650) -> io::Result<()> {
651 s.write_all(build_lru_crawler_cmd(arg)).await?;
652 s.flush().await?;
653 parse_ok_rp(s, false).await
654}
655
656async fn lru_crawler_sleep_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
657 s: &mut S,
658 microseconds: usize,
659) -> io::Result<()> {
660 s.write_all(&build_lru_clawler_sleep_cmd(microseconds))
661 .await?;
662 s.flush().await?;
663 parse_ok_rp(s, false).await
664}
665
666async fn lru_crawler_tocrawl_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
667 s: &mut S,
668 arg: u32,
669) -> io::Result<()> {
670 s.write_all(&build_lru_crawler_tocrawl_cmd(arg)).await?;
671 s.flush().await?;
672 parse_ok_rp(s, false).await
673}
674
675async fn lru_crawler_crawl_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
676 s: &mut S,
677 arg: LruCrawlerCrawlArg<'_>,
678) -> io::Result<()> {
679 s.write_all(&build_lru_clawler_crawl_cmd(arg)).await?;
680 s.flush().await?;
681 parse_ok_rp(s, false).await
682}
683
684async fn slabs_reassign_cmd<S: AsyncBufRead + AsyncWrite + Unpin>(
685 s: &mut S,
686 source_class: usize,
687 dest_class: usize,
688) -> io::Result<()> {
689 s.write_all(&build_slabs_reassign_cmd(source_class, dest_class))
690 .await?;
691 s.flush().await?;
692 parse_ok_rp(s, false).await
693}
694
695async fn lru_crawler_metadump_cmd<S>(
696 s: &mut S,
697 arg: LruCrawlerMetadumpArg<'_>,
698) -> io::Result<Vec<String>>
699where
700 S: AsyncBufRead + AsyncWrite + Unpin,
701{
702 s.write_all(&build_lru_clawler_metadump_cmd(arg)).await?;
703 s.flush().await?;
704 parse_lru_crawler_metadump_rp(s).await
705}
706
707async fn lru_crawler_mgdump_cmd<S>(
708 s: &mut S,
709 arg: LruCrawlerMgdumpArg<'_>,
710) -> io::Result<Vec<String>>
711where
712 S: AsyncBufRead + AsyncWrite + Unpin,
713{
714 s.write_all(&build_lru_clawler_mgdump_cmd(arg)).await?;
715 s.flush().await?;
716 parse_lru_crawler_mgdump_rp(s).await
717}
718
719async fn mn_cmd<S>(s: &mut S) -> io::Result<()>
720where
721 S: AsyncBufRead + AsyncWrite + Unpin,
722{
723 s.write_all(build_mn_cmd()).await?;
724 s.flush().await?;
725 parse_mn_rp(s).await
726}
727
728async fn me_cmd<S>(s: &mut S, key: &[u8]) -> io::Result<Option<String>>
729where
730 S: AsyncBufRead + AsyncWrite + Unpin,
731{
732 s.write_all(&build_me_cmd(key)).await?;
733 s.flush().await?;
734 parse_me_r(s).await
735}
736
737async fn execute_cmd<S>(s: &mut S, cmds: &[Vec<u8>]) -> io::Result<Vec<PipelineResponse>>
738where
739 S: AsyncBufRead + AsyncWrite + Unpin,
740{
741 s.write_all(&cmds.concat()).await?;
742 s.flush().await?;
743 let mut result = Vec::new();
744 for cmd in cmds {
745 if cmd.starts_with(b"gets ")
746 || cmd.starts_with(b"get ")
747 || cmd.starts_with(b"gats ")
748 || cmd.starts_with(b"gat ")
749 {
750 parse_retrieval_rp(s)
751 .await?
752 .into_iter()
753 .for_each(|x| result.push(PipelineResponse::Item(x)))
754 } else if cmd.starts_with(b"set _ _ _ ") {
755 result.push(PipelineResponse::Unit(parse_auth_rp(s).await?));
756 } else if cmd.starts_with(b"set ")
757 || cmd.starts_with(b"add ")
758 || cmd.starts_with(b"replace ")
759 || cmd.starts_with(b"append ")
760 || cmd.starts_with(b"prepend ")
761 || cmd.starts_with(b"cas ")
762 {
763 let mut split = cmd.split(|x| x == &b'\r');
764 let n = split.next().unwrap();
765 result.push(PipelineResponse::Bool(
766 parse_storage_rp(s, n.ends_with(b"noreply")).await?,
767 ))
768 } else if cmd == build_version_cmd() {
769 result.push(PipelineResponse::String(parse_version_rp(s).await?));
770 } else if cmd.starts_with(b"delete ") {
771 result.push(PipelineResponse::Bool(
772 parse_delete_rp(s, cmd.ends_with(b"noreply\r\n")).await?,
773 ))
774 } else if cmd.starts_with(b"incr ") || cmd.starts_with(b"decr ") {
775 result.push(PipelineResponse::Value(
776 parse_incr_decr_rp(s, cmd.ends_with(b"noreply\r\n")).await?,
777 ))
778 } else if cmd.starts_with(b"touch ") {
779 result.push(PipelineResponse::Bool(
780 parse_touch_rp(s, cmd.ends_with(b"noreply\r\n")).await?,
781 ))
782 } else if cmd == build_quit_cmd() || cmd.starts_with(b"shutdown") {
783 result.push(PipelineResponse::Unit(()));
784 } else if cmd.starts_with(b"flush_all") || cmd.starts_with(b"cache_memlimit ") {
785 result.push(PipelineResponse::Unit(
786 parse_ok_rp(s, cmd.ends_with(b"noreply\r\n")).await?,
787 ))
788 } else if cmd.starts_with(b"slabs automove ")
789 || cmd.starts_with(b"slabs reassign ")
790 || cmd.starts_with(b"lru_crawler sleep ")
791 || cmd.starts_with(b"lru_crawler crawl ")
792 || cmd.starts_with(b"lru_crawler tocrawl ")
793 || cmd == build_lru_crawler_cmd(LruCrawlerArg::Enable)
794 || cmd == build_lru_crawler_cmd(LruCrawlerArg::Disable)
795 {
796 result.push(PipelineResponse::Unit(parse_ok_rp(s, false).await?))
797 } else if cmd == build_mn_cmd() {
798 result.push(PipelineResponse::Unit(parse_mn_rp(s).await?))
799 } else if cmd.starts_with(b"stats") {
800 result.push(PipelineResponse::HashMap(parse_stats_rp(s).await?))
801 } else if cmd.starts_with(b"lru_crawler metadump ") {
802 result.push(PipelineResponse::VecString(
803 parse_lru_crawler_metadump_rp(s).await?,
804 ))
805 } else if cmd.starts_with(b"lru_crawler mgdump ") {
806 result.push(PipelineResponse::VecString(
807 parse_lru_crawler_mgdump_rp(s).await?,
808 ))
809 } else if cmd.starts_with(b"me ") {
810 result.push(PipelineResponse::OptionString(parse_me_r(s).await?))
811 }
812 }
813 Ok(result)
814}
815
816pub enum Connection {
817 Tcp(BufReader<TcpStream>),
818 Unix(BufReader<UnixStream>),
819 Udp(UdpSocket),
820}
821impl Connection {
822 pub async fn default() -> io::Result<Self> {
834 Ok(Connection::Tcp(BufReader::new(
835 TcpStream::connect("127.0.0.1:11211").await?,
836 )))
837 }
838
839 pub async fn tcp_connect(addr: &str) -> io::Result<Self> {
851 Ok(Connection::Tcp(BufReader::new(
852 TcpStream::connect(addr).await?,
853 )))
854 }
855
856 pub async fn unix_connect(path: &str) -> io::Result<Self> {
868 Ok(Connection::Unix(BufReader::new(
869 UnixStream::connect(path).await?,
870 )))
871 }
872
873 pub async fn version(&mut self) -> io::Result<String> {
887 match self {
888 Connection::Tcp(s) => version_cmd(s).await,
889 Connection::Unix(s) => version_cmd(s).await,
890 Connection::Udp(_s) => todo!(),
891 }
892 }
893
894 pub async fn quit(&mut self) -> io::Result<()> {
907 match self {
908 Connection::Tcp(s) => quit_cmd(s).await,
909 Connection::Unix(s) => quit_cmd(s).await,
910 Connection::Udp(_s) => todo!(),
911 }
912 }
913
914 pub async fn shutdown(&mut self, graceful: bool) -> io::Result<()> {
927 match self {
928 Connection::Tcp(s) => shutdown_cmd(s, graceful).await,
929 Connection::Unix(s) => shutdown_cmd(s, graceful).await,
930 Connection::Udp(_s) => todo!(),
931 }
932 }
933
934 pub async fn cache_memlimit(&mut self, limit: usize, noreply: bool) -> io::Result<()> {
947 match self {
948 Connection::Tcp(s) => cache_memlimit_cmd(s, limit, noreply).await,
949 Connection::Unix(s) => cache_memlimit_cmd(s, limit, noreply).await,
950 Connection::Udp(_s) => todo!(),
951 }
952 }
953
954 pub async fn flush_all(&mut self, exptime: Option<i64>, noreply: bool) -> io::Result<()> {
967 match self {
968 Connection::Tcp(s) => flush_all_cmd(s, exptime, noreply).await,
969 Connection::Unix(s) => flush_all_cmd(s, exptime, noreply).await,
970 Connection::Udp(_s) => todo!(),
971 }
972 }
973
974 pub async fn set(
988 &mut self,
989 key: impl AsRef<[u8]>,
990 flags: u32,
991 exptime: i64,
992 noreply: bool,
993 data_block: impl AsRef<[u8]>,
994 ) -> io::Result<bool> {
995 match self {
996 Connection::Tcp(s) => {
997 storage_cmd(
998 s,
999 b"set",
1000 key.as_ref(),
1001 flags,
1002 exptime,
1003 None,
1004 noreply,
1005 data_block.as_ref(),
1006 )
1007 .await
1008 }
1009 Connection::Unix(s) => {
1010 storage_cmd(
1011 s,
1012 b"set",
1013 key.as_ref(),
1014 flags,
1015 exptime,
1016 None,
1017 noreply,
1018 data_block.as_ref(),
1019 )
1020 .await
1021 }
1022 Connection::Udp(_s) => todo!(),
1023 }
1024 }
1025
1026 pub async fn add(
1040 &mut self,
1041 key: impl AsRef<[u8]>,
1042 flags: u32,
1043 exptime: i64,
1044 noreply: bool,
1045 data_block: impl AsRef<[u8]>,
1046 ) -> io::Result<bool> {
1047 match self {
1048 Connection::Tcp(s) => {
1049 storage_cmd(
1050 s,
1051 b"add",
1052 key.as_ref(),
1053 flags,
1054 exptime,
1055 None,
1056 noreply,
1057 data_block.as_ref(),
1058 )
1059 .await
1060 }
1061 Connection::Unix(s) => {
1062 storage_cmd(
1063 s,
1064 b"add",
1065 key.as_ref(),
1066 flags,
1067 exptime,
1068 None,
1069 noreply,
1070 data_block.as_ref(),
1071 )
1072 .await
1073 }
1074 Connection::Udp(_s) => todo!(),
1075 }
1076 }
1077
1078 pub async fn replace(
1092 &mut self,
1093 key: impl AsRef<[u8]>,
1094 flags: u32,
1095 exptime: i64,
1096 noreply: bool,
1097 data_block: impl AsRef<[u8]>,
1098 ) -> io::Result<bool> {
1099 match self {
1100 Connection::Tcp(s) => {
1101 storage_cmd(
1102 s,
1103 b"replace",
1104 key.as_ref(),
1105 flags,
1106 exptime,
1107 None,
1108 noreply,
1109 data_block.as_ref(),
1110 )
1111 .await
1112 }
1113 Connection::Unix(s) => {
1114 storage_cmd(
1115 s,
1116 b"replace",
1117 key.as_ref(),
1118 flags,
1119 exptime,
1120 None,
1121 noreply,
1122 data_block.as_ref(),
1123 )
1124 .await
1125 }
1126 Connection::Udp(_s) => todo!(),
1127 }
1128 }
1129
1130 pub async fn append(
1144 &mut self,
1145 key: impl AsRef<[u8]>,
1146 flags: u32,
1147 exptime: i64,
1148 noreply: bool,
1149 data_block: impl AsRef<[u8]>,
1150 ) -> io::Result<bool> {
1151 match self {
1152 Connection::Tcp(s) => {
1153 storage_cmd(
1154 s,
1155 b"append",
1156 key.as_ref(),
1157 flags,
1158 exptime,
1159 None,
1160 noreply,
1161 data_block.as_ref(),
1162 )
1163 .await
1164 }
1165 Connection::Unix(s) => {
1166 storage_cmd(
1167 s,
1168 b"append",
1169 key.as_ref(),
1170 flags,
1171 exptime,
1172 None,
1173 noreply,
1174 data_block.as_ref(),
1175 )
1176 .await
1177 }
1178 Connection::Udp(_s) => todo!(),
1179 }
1180 }
1181
1182 pub async fn prepend(
1196 &mut self,
1197 key: impl AsRef<[u8]>,
1198 flags: u32,
1199 exptime: i64,
1200 noreply: bool,
1201 data_block: impl AsRef<[u8]>,
1202 ) -> io::Result<bool> {
1203 match self {
1204 Connection::Tcp(s) => {
1205 storage_cmd(
1206 s,
1207 b"prepend",
1208 key.as_ref(),
1209 flags,
1210 exptime,
1211 None,
1212 noreply,
1213 data_block.as_ref(),
1214 )
1215 .await
1216 }
1217 Connection::Unix(s) => {
1218 storage_cmd(
1219 s,
1220 b"prepend",
1221 key.as_ref(),
1222 flags,
1223 exptime,
1224 None,
1225 noreply,
1226 data_block.as_ref(),
1227 )
1228 .await
1229 }
1230 Connection::Udp(_s) => todo!(),
1231 }
1232 }
1233
1234 pub async fn cas(
1248 &mut self,
1249 key: impl AsRef<[u8]>,
1250 flags: u32,
1251 exptime: i64,
1252 cas_unique: u64,
1253 noreply: bool,
1254 data_block: impl AsRef<[u8]>,
1255 ) -> io::Result<bool> {
1256 match self {
1257 Connection::Tcp(s) => {
1258 storage_cmd(
1259 s,
1260 b"cas",
1261 key.as_ref(),
1262 flags,
1263 exptime,
1264 Some(cas_unique),
1265 noreply,
1266 data_block.as_ref(),
1267 )
1268 .await
1269 }
1270 Connection::Unix(s) => {
1271 storage_cmd(
1272 s,
1273 b"cas",
1274 key.as_ref(),
1275 flags,
1276 exptime,
1277 Some(cas_unique),
1278 noreply,
1279 data_block.as_ref(),
1280 )
1281 .await
1282 }
1283 Connection::Udp(_s) => todo!(),
1284 }
1285 }
1286
1287 pub async fn auth(
1300 &mut self,
1301 username: impl AsRef<[u8]>,
1302 password: impl AsRef<[u8]>,
1303 ) -> io::Result<()> {
1304 match self {
1305 Connection::Tcp(s) => auth_cmd(s, username.as_ref(), password.as_ref()).await,
1306 Connection::Unix(s) => auth_cmd(s, username.as_ref(), password.as_ref()).await,
1307 Connection::Udp(_s) => todo!(),
1308 }
1309 }
1310
1311 pub async fn delete(&mut self, key: impl AsRef<[u8]>, noreply: bool) -> io::Result<bool> {
1325 match self {
1326 Connection::Tcp(s) => delete_cmd(s, key.as_ref(), noreply).await,
1327 Connection::Unix(s) => delete_cmd(s, key.as_ref(), noreply).await,
1328 Connection::Udp(_s) => todo!(),
1329 }
1330 }
1331
1332 pub async fn incr(
1345 &mut self,
1346 key: impl AsRef<[u8]>,
1347 value: u64,
1348 noreply: bool,
1349 ) -> io::Result<Option<u64>> {
1350 match self {
1351 Connection::Tcp(s) => incr_decr_cmd(s, b"incr", key.as_ref(), value, noreply).await,
1352 Connection::Unix(s) => incr_decr_cmd(s, b"incr", key.as_ref(), value, noreply).await,
1353 Connection::Udp(_s) => todo!(),
1354 }
1355 }
1356
1357 pub async fn decr(
1371 &mut self,
1372 key: impl AsRef<[u8]>,
1373 value: u64,
1374 noreply: bool,
1375 ) -> io::Result<Option<u64>> {
1376 match self {
1377 Connection::Tcp(s) => incr_decr_cmd(s, b"decr", key.as_ref(), value, noreply).await,
1378 Connection::Unix(s) => incr_decr_cmd(s, b"decr", key.as_ref(), value, noreply).await,
1379 Connection::Udp(_s) => todo!(),
1380 }
1381 }
1382
1383 pub async fn touch(
1397 &mut self,
1398 key: impl AsRef<[u8]>,
1399 exptime: i64,
1400 noreply: bool,
1401 ) -> io::Result<bool> {
1402 match self {
1403 Connection::Tcp(s) => touch_cmd(s, key.as_ref(), exptime, noreply).await,
1404 Connection::Unix(s) => touch_cmd(s, key.as_ref(), exptime, noreply).await,
1405 Connection::Udp(_s) => todo!(),
1406 }
1407 }
1408
1409 pub async fn get(&mut self, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
1424 match self {
1425 Connection::Tcp(s) => Ok(retrieval_cmd(s, b"get", None, &[key.as_ref()]).await?.pop()),
1426 Connection::Unix(s) => Ok(retrieval_cmd(s, b"get", None, &[key.as_ref()]).await?.pop()),
1427 Connection::Udp(_s) => todo!(),
1428 }
1429 }
1430
1431 pub async fn gets(&mut self, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
1446 match self {
1447 Connection::Tcp(s) => Ok(retrieval_cmd(s, b"gets", None, &[key.as_ref()])
1448 .await?
1449 .pop()),
1450 Connection::Unix(s) => Ok(retrieval_cmd(s, b"gets", None, &[key.as_ref()])
1451 .await?
1452 .pop()),
1453 Connection::Udp(_s) => todo!(),
1454 }
1455 }
1456
1457 pub async fn gat(&mut self, exptime: i64, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
1472 match self {
1473 Connection::Tcp(s) => Ok(retrieval_cmd(s, b"gat", Some(exptime), &[key.as_ref()])
1474 .await?
1475 .pop()),
1476 Connection::Unix(s) => Ok(retrieval_cmd(s, b"gat", Some(exptime), &[key.as_ref()])
1477 .await?
1478 .pop()),
1479 Connection::Udp(_s) => todo!(),
1480 }
1481 }
1482
1483 pub async fn gats(&mut self, exptime: i64, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
1498 match self {
1499 Connection::Tcp(s) => Ok(retrieval_cmd(s, b"gats", Some(exptime), &[key.as_ref()])
1500 .await?
1501 .pop()),
1502 Connection::Unix(s) => Ok(retrieval_cmd(s, b"gats", Some(exptime), &[key.as_ref()])
1503 .await?
1504 .pop()),
1505 Connection::Udp(_s) => todo!(),
1506 }
1507 }
1508
1509 pub async fn get_multi(&mut self, keys: &[impl AsRef<[u8]>]) -> io::Result<Vec<Item>> {
1524 match self {
1525 Connection::Tcp(s) => {
1526 retrieval_cmd(
1527 s,
1528 b"get",
1529 None,
1530 &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1531 )
1532 .await
1533 }
1534 Connection::Unix(s) => {
1535 retrieval_cmd(
1536 s,
1537 b"get",
1538 None,
1539 &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1540 )
1541 .await
1542 }
1543 Connection::Udp(_s) => todo!(),
1544 }
1545 }
1546
1547 pub async fn gets_multi(&mut self, keys: &[impl AsRef<[u8]>]) -> io::Result<Vec<Item>> {
1562 match self {
1563 Connection::Tcp(s) => {
1564 retrieval_cmd(
1565 s,
1566 b"gets",
1567 None,
1568 &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1569 )
1570 .await
1571 }
1572 Connection::Unix(s) => {
1573 retrieval_cmd(
1574 s,
1575 b"gets",
1576 None,
1577 &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1578 )
1579 .await
1580 }
1581 Connection::Udp(_s) => todo!(),
1582 }
1583 }
1584
1585 pub async fn gat_multi(
1600 &mut self,
1601 exptime: i64,
1602 keys: &[impl AsRef<[u8]>],
1603 ) -> io::Result<Vec<Item>> {
1604 match self {
1605 Connection::Tcp(s) => {
1606 retrieval_cmd(
1607 s,
1608 b"gat",
1609 Some(exptime),
1610 &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1611 )
1612 .await
1613 }
1614 Connection::Unix(s) => {
1615 retrieval_cmd(
1616 s,
1617 b"gat",
1618 Some(exptime),
1619 &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1620 )
1621 .await
1622 }
1623 Connection::Udp(_s) => todo!(),
1624 }
1625 }
1626
1627 pub async fn gats_multi(
1642 &mut self,
1643 exptime: i64,
1644 keys: &[impl AsRef<[u8]>],
1645 ) -> io::Result<Vec<Item>> {
1646 match self {
1647 Connection::Tcp(s) => {
1648 retrieval_cmd(
1649 s,
1650 b"gats",
1651 Some(exptime),
1652 &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1653 )
1654 .await
1655 }
1656 Connection::Unix(s) => {
1657 retrieval_cmd(
1658 s,
1659 b"gats",
1660 Some(exptime),
1661 &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
1662 )
1663 .await
1664 }
1665 Connection::Udp(_s) => todo!(),
1666 }
1667 }
1668
1669 pub async fn stats(&mut self, arg: StatsArg) -> io::Result<HashMap<String, String>> {
1684 match self {
1685 Connection::Tcp(s) => stats_cmd(s, arg).await,
1686 Connection::Unix(s) => stats_cmd(s, arg).await,
1687 Connection::Udp(_s) => todo!(),
1688 }
1689 }
1690
1691 pub async fn slabs_automove(&mut self, arg: SlabsAutomoveArg) -> io::Result<()> {
1705 match self {
1706 Connection::Tcp(s) => slabs_automove_cmd(s, arg).await,
1707 Connection::Unix(s) => slabs_automove_cmd(s, arg).await,
1708 Connection::Udp(_s) => todo!(),
1709 }
1710 }
1711
1712 pub async fn lru_crawler(&mut self, arg: LruCrawlerArg) -> io::Result<()> {
1726 match self {
1727 Connection::Tcp(s) => lru_crawler_cmd(s, arg).await,
1728 Connection::Unix(s) => lru_crawler_cmd(s, arg).await,
1729 Connection::Udp(_s) => todo!(),
1730 }
1731 }
1732
1733 pub async fn lru_crawler_sleep(&mut self, microseconds: usize) -> io::Result<()> {
1746 match self {
1747 Connection::Tcp(s) => lru_crawler_sleep_cmd(s, microseconds).await,
1748 Connection::Unix(s) => lru_crawler_sleep_cmd(s, microseconds).await,
1749 Connection::Udp(_s) => todo!(),
1750 }
1751 }
1752
1753 pub async fn lru_crawler_tocrawl(&mut self, arg: u32) -> io::Result<()> {
1766 match self {
1767 Connection::Tcp(s) => lru_crawler_tocrawl_cmd(s, arg).await,
1768 Connection::Unix(s) => lru_crawler_tocrawl_cmd(s, arg).await,
1769 Connection::Udp(_s) => todo!(),
1770 }
1771 }
1772
1773 pub async fn lru_crawler_crawl(&mut self, arg: LruCrawlerCrawlArg<'_>) -> io::Result<()> {
1786 match self {
1787 Connection::Tcp(s) => lru_crawler_crawl_cmd(s, arg).await,
1788 Connection::Unix(s) => lru_crawler_crawl_cmd(s, arg).await,
1789 Connection::Udp(_s) => todo!(),
1790 }
1791 }
1792
1793 pub async fn slabs_reassign(
1807 &mut self,
1808 source_class: usize,
1809 dest_class: usize,
1810 ) -> io::Result<()> {
1811 match self {
1812 Connection::Tcp(s) => slabs_reassign_cmd(s, source_class, dest_class).await,
1813 Connection::Unix(s) => slabs_reassign_cmd(s, source_class, dest_class).await,
1814 Connection::Udp(_s) => todo!(),
1815 }
1816 }
1817
1818 pub async fn lru_crawler_metadump(
1832 &mut self,
1833 arg: LruCrawlerMetadumpArg<'_>,
1834 ) -> io::Result<Vec<String>> {
1835 match self {
1836 Connection::Tcp(s) => lru_crawler_metadump_cmd(s, arg).await,
1837 Connection::Unix(s) => lru_crawler_metadump_cmd(s, arg).await,
1838 Connection::Udp(_s) => todo!(),
1839 }
1840 }
1841
1842 pub async fn lru_crawler_mgdump(
1856 &mut self,
1857 arg: LruCrawlerMgdumpArg<'_>,
1858 ) -> io::Result<Vec<String>> {
1859 match self {
1860 Connection::Tcp(s) => lru_crawler_mgdump_cmd(s, arg).await,
1861 Connection::Unix(s) => lru_crawler_mgdump_cmd(s, arg).await,
1862 Connection::Udp(_s) => todo!(),
1863 }
1864 }
1865
1866 pub async fn mn(&mut self) -> io::Result<()> {
1879 match self {
1880 Connection::Tcp(s) => mn_cmd(s).await,
1881 Connection::Unix(s) => mn_cmd(s).await,
1882 Connection::Udp(_s) => todo!(),
1883 }
1884 }
1885
1886 pub async fn me(&mut self, key: impl AsRef<[u8]>) -> io::Result<Option<String>> {
1900 match self {
1901 Connection::Tcp(s) => me_cmd(s, key.as_ref()).await,
1902 Connection::Unix(s) => me_cmd(s, key.as_ref()).await,
1903 Connection::Udp(_s) => todo!(),
1904 }
1905 }
1906
1907 pub fn pipeline(&mut self) -> Pipeline {
1908 Pipeline::new(self)
1909 }
1910}
1911
1912pub struct ClientCrc32(Vec<Connection>);
1913impl ClientCrc32 {
1914 pub fn new(conns: Vec<Connection>) -> Self {
1931 Self(conns)
1932 }
1933
1934 pub async fn get(&mut self, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
1954 let size = self.0.len();
1955 self.0[crc32fast::hash(key.as_ref()) as usize % size]
1956 .get(key.as_ref())
1957 .await
1958 }
1959
1960 pub async fn gets(&mut self, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
1980 let size = self.0.len();
1981 self.0[crc32fast::hash(key.as_ref()) as usize % size]
1982 .gets(key.as_ref())
1983 .await
1984 }
1985
1986 pub async fn gat(&mut self, exptime: i64, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
2001 let size = self.0.len();
2002 self.0[crc32fast::hash(key.as_ref()) as usize % size]
2003 .gat(exptime, key.as_ref())
2004 .await
2005 }
2006
2007 pub async fn gats(&mut self, exptime: i64, key: impl AsRef<[u8]>) -> io::Result<Option<Item>> {
2022 let size = self.0.len();
2023 self.0[crc32fast::hash(key.as_ref()) as usize % size]
2024 .gats(exptime, key.as_ref())
2025 .await
2026 }
2027
2028 pub async fn set(
2047 &mut self,
2048 key: impl AsRef<[u8]>,
2049 flags: u32,
2050 exptime: i64,
2051 noreply: bool,
2052 data_block: impl AsRef<[u8]>,
2053 ) -> io::Result<bool> {
2054 let size = self.0.len();
2055 self.0[crc32fast::hash(key.as_ref()) as usize % size]
2056 .set(key.as_ref(), flags, exptime, noreply, data_block.as_ref())
2057 .await
2058 }
2059
2060 pub async fn add(
2079 &mut self,
2080 key: impl AsRef<[u8]>,
2081 flags: u32,
2082 exptime: i64,
2083 noreply: bool,
2084 data_block: impl AsRef<[u8]>,
2085 ) -> io::Result<bool> {
2086 let size = self.0.len();
2087 self.0[crc32fast::hash(key.as_ref()) as usize % size]
2088 .add(key.as_ref(), flags, exptime, noreply, data_block.as_ref())
2089 .await
2090 }
2091
2092 pub async fn replace(
2111 &mut self,
2112 key: impl AsRef<[u8]>,
2113 flags: u32,
2114 exptime: i64,
2115 noreply: bool,
2116 data_block: impl AsRef<[u8]>,
2117 ) -> io::Result<bool> {
2118 let size = self.0.len();
2119 self.0[crc32fast::hash(key.as_ref()) as usize % size]
2120 .replace(key.as_ref(), flags, exptime, noreply, data_block.as_ref())
2121 .await
2122 }
2123
2124 pub async fn append(
2143 &mut self,
2144 key: impl AsRef<[u8]>,
2145 flags: u32,
2146 exptime: i64,
2147 noreply: bool,
2148 data_block: impl AsRef<[u8]>,
2149 ) -> io::Result<bool> {
2150 let size = self.0.len();
2151 self.0[crc32fast::hash(key.as_ref()) as usize % size]
2152 .append(key.as_ref(), flags, exptime, noreply, data_block.as_ref())
2153 .await
2154 }
2155
2156 pub async fn prepend(
2175 &mut self,
2176 key: impl AsRef<[u8]>,
2177 flags: u32,
2178 exptime: i64,
2179 noreply: bool,
2180 data_block: impl AsRef<[u8]>,
2181 ) -> io::Result<bool> {
2182 let size = self.0.len();
2183 self.0[crc32fast::hash(key.as_ref()) as usize % size]
2184 .prepend(key.as_ref(), flags, exptime, noreply, data_block.as_ref())
2185 .await
2186 }
2187
2188 pub async fn cas(
2207 &mut self,
2208 key: impl AsRef<[u8]>,
2209 flags: u32,
2210 exptime: i64,
2211 cas_unique: u64,
2212 noreply: bool,
2213 data_block: impl AsRef<[u8]>,
2214 ) -> io::Result<bool> {
2215 let size = self.0.len();
2216 self.0[crc32fast::hash(key.as_ref()) as usize % size]
2217 .cas(
2218 key.as_ref(),
2219 flags,
2220 exptime,
2221 cas_unique,
2222 noreply,
2223 data_block.as_ref(),
2224 )
2225 .await
2226 }
2227
2228 pub async fn delete(&mut self, key: impl AsRef<[u8]>, noreply: bool) -> io::Result<bool> {
2247 let size = self.0.len();
2248 self.0[crc32fast::hash(key.as_ref()) as usize % size]
2249 .delete(key.as_ref(), noreply)
2250 .await
2251 }
2252
2253 pub async fn incr(
2272 &mut self,
2273 key: impl AsRef<[u8]>,
2274 value: u64,
2275 noreply: bool,
2276 ) -> io::Result<Option<u64>> {
2277 let size = self.0.len();
2278 self.0[crc32fast::hash(key.as_ref()) as usize % size]
2279 .incr(key.as_ref(), value, noreply)
2280 .await
2281 }
2282
2283 pub async fn decr(
2302 &mut self,
2303 key: impl AsRef<[u8]>,
2304 value: u64,
2305 noreply: bool,
2306 ) -> io::Result<Option<u64>> {
2307 let size = self.0.len();
2308 self.0[crc32fast::hash(key.as_ref()) as usize % size]
2309 .decr(key.as_ref(), value, noreply)
2310 .await
2311 }
2312
2313 pub async fn touch(
2332 &mut self,
2333 key: impl AsRef<[u8]>,
2334 exptime: i64,
2335 noreply: bool,
2336 ) -> io::Result<bool> {
2337 let size = self.0.len();
2338 self.0[crc32fast::hash(key.as_ref()) as usize % size]
2339 .touch(key.as_ref(), exptime, noreply)
2340 .await
2341 }
2342
2343 pub async fn me(&mut self, key: impl AsRef<[u8]>) -> io::Result<Option<String>> {
2362 let size = self.0.len();
2363 self.0[crc32fast::hash(key.as_ref()) as usize % size]
2364 .me(key.as_ref())
2365 .await
2366 }
2367}
2368
2369pub struct Pipeline<'a>(&'a mut Connection, Vec<Vec<u8>>);
2370impl<'a> Pipeline<'a> {
2371 fn new(conn: &'a mut Connection) -> Self {
2384 Self(conn, Vec::new())
2385 }
2386
2387 pub async fn execute(self) -> io::Result<Vec<PipelineResponse>> {
2400 if self.1.is_empty() {
2401 return Ok(Vec::new());
2402 };
2403 match self.0 {
2404 Connection::Tcp(s) => execute_cmd(s, &self.1).await,
2405 Connection::Unix(s) => execute_cmd(s, &self.1).await,
2406 Connection::Udp(_s) => todo!(),
2407 }
2408 }
2409
2410 pub fn version(mut self) -> Self {
2423 self.1.push(build_version_cmd().to_vec());
2424 self
2425 }
2426
2427 pub fn quit(mut self) -> Self {
2440 self.1.push(build_quit_cmd().to_vec());
2441 self
2442 }
2443
2444 pub fn shutdown(mut self, graceful: bool) -> Self {
2457 self.1.push(build_shutdown_cmd(graceful).to_vec());
2458 self
2459 }
2460
2461 pub fn cache_memlimit(mut self, limit: usize, noreply: bool) -> Self {
2474 self.1
2475 .push(build_cache_memlimit_cmd(limit, noreply).to_vec());
2476 self
2477 }
2478
2479 pub fn flush_all(mut self, exptime: Option<i64>, noreply: bool) -> Self {
2492 self.1.push(build_flush_all_cmd(exptime, noreply).to_vec());
2493 self
2494 }
2495
2496 pub fn set(
2509 mut self,
2510 key: impl AsRef<[u8]>,
2511 flags: u32,
2512 exptime: i64,
2513 noreply: bool,
2514 data_block: impl AsRef<[u8]>,
2515 ) -> Self {
2516 self.1.push(build_storage_cmd(
2517 b"set",
2518 key.as_ref(),
2519 flags,
2520 exptime,
2521 None,
2522 noreply,
2523 data_block.as_ref(),
2524 ));
2525 self
2526 }
2527
2528 pub fn add(
2541 mut self,
2542 key: impl AsRef<[u8]>,
2543 flags: u32,
2544 exptime: i64,
2545 noreply: bool,
2546 data_block: impl AsRef<[u8]>,
2547 ) -> Self {
2548 self.1.push(build_storage_cmd(
2549 b"add",
2550 key.as_ref(),
2551 flags,
2552 exptime,
2553 None,
2554 noreply,
2555 data_block.as_ref(),
2556 ));
2557 self
2558 }
2559
2560 pub fn replace(
2573 mut self,
2574 key: impl AsRef<[u8]>,
2575 flags: u32,
2576 exptime: i64,
2577 noreply: bool,
2578 data_block: impl AsRef<[u8]>,
2579 ) -> Self {
2580 self.1.push(build_storage_cmd(
2581 b"replace",
2582 key.as_ref(),
2583 flags,
2584 exptime,
2585 None,
2586 noreply,
2587 data_block.as_ref(),
2588 ));
2589 self
2590 }
2591
2592 pub fn append(
2605 mut self,
2606 key: impl AsRef<[u8]>,
2607 flags: u32,
2608 exptime: i64,
2609 noreply: bool,
2610 data_block: impl AsRef<[u8]>,
2611 ) -> Self {
2612 self.1.push(build_storage_cmd(
2613 b"append",
2614 key.as_ref(),
2615 flags,
2616 exptime,
2617 None,
2618 noreply,
2619 data_block.as_ref(),
2620 ));
2621 self
2622 }
2623
2624 pub fn prepend(
2637 mut self,
2638 key: impl AsRef<[u8]>,
2639 flags: u32,
2640 exptime: i64,
2641 noreply: bool,
2642 data_block: impl AsRef<[u8]>,
2643 ) -> Self {
2644 self.1.push(build_storage_cmd(
2645 b"prepend",
2646 key.as_ref(),
2647 flags,
2648 exptime,
2649 None,
2650 noreply,
2651 data_block.as_ref(),
2652 ));
2653 self
2654 }
2655
2656 pub fn cas(
2669 mut self,
2670 key: impl AsRef<[u8]>,
2671 flags: u32,
2672 exptime: i64,
2673 cas_unique: u64,
2674 noreply: bool,
2675 data_block: impl AsRef<[u8]>,
2676 ) -> Self {
2677 self.1.push(build_storage_cmd(
2678 b"cas",
2679 key.as_ref(),
2680 flags,
2681 exptime,
2682 Some(cas_unique),
2683 noreply,
2684 data_block.as_ref(),
2685 ));
2686 self
2687 }
2688
2689 pub fn auth(mut self, username: impl AsRef<[u8]>, password: impl AsRef<[u8]>) -> Self {
2702 self.1
2703 .push(build_auth_cmd(username.as_ref(), password.as_ref()));
2704 self
2705 }
2706
2707 pub fn delete(mut self, key: impl AsRef<[u8]>, noreply: bool) -> Self {
2720 self.1.push(build_delete_cmd(key.as_ref(), noreply));
2721 self
2722 }
2723
2724 pub fn incr(mut self, key: impl AsRef<[u8]>, value: u64, noreply: bool) -> Self {
2737 self.1
2738 .push(build_incr_decr_cmd(b"incr", key.as_ref(), value, noreply));
2739 self
2740 }
2741
2742 pub fn decr(mut self, key: impl AsRef<[u8]>, value: u64, noreply: bool) -> Self {
2755 self.1
2756 .push(build_incr_decr_cmd(b"decr", key.as_ref(), value, noreply));
2757 self
2758 }
2759
2760 pub fn touch(mut self, key: impl AsRef<[u8]>, exptime: i64, noreply: bool) -> Self {
2773 self.1.push(build_touch_cmd(key.as_ref(), exptime, noreply));
2774 self
2775 }
2776
2777 pub fn get(mut self, key: impl AsRef<[u8]>) -> Self {
2790 self.1
2791 .push(build_retrieval_cmd(b"get", None, &[key.as_ref()]));
2792 self
2793 }
2794
2795 pub fn gets(mut self, key: impl AsRef<[u8]>) -> Self {
2808 self.1
2809 .push(build_retrieval_cmd(b"gets", None, &[key.as_ref()]));
2810 self
2811 }
2812
2813 pub fn gat(mut self, exptime: i64, key: impl AsRef<[u8]>) -> Self {
2826 self.1
2827 .push(build_retrieval_cmd(b"gat", Some(exptime), &[key.as_ref()]));
2828 self
2829 }
2830
2831 pub fn gats(mut self, exptime: i64, key: impl AsRef<[u8]>) -> Self {
2844 self.1
2845 .push(build_retrieval_cmd(b"gats", Some(exptime), &[key.as_ref()]));
2846 self
2847 }
2848
2849 pub fn get_multi(mut self, keys: &[impl AsRef<[u8]>]) -> Self {
2862 self.1.push(build_retrieval_cmd(
2863 b"get",
2864 None,
2865 &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
2866 ));
2867 self
2868 }
2869
2870 pub fn gets_multi(mut self, keys: &[impl AsRef<[u8]>]) -> Self {
2883 self.1.push(build_retrieval_cmd(
2884 b"gets",
2885 None,
2886 &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
2887 ));
2888 self
2889 }
2890
2891 pub fn gat_multi(mut self, exptime: i64, keys: &[impl AsRef<[u8]>]) -> Self {
2904 self.1.push(build_retrieval_cmd(
2905 b"gat",
2906 Some(exptime),
2907 &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
2908 ));
2909 self
2910 }
2911
2912 pub fn gats_multi(mut self, exptime: i64, keys: &[impl AsRef<[u8]>]) -> Self {
2925 self.1.push(build_retrieval_cmd(
2926 b"gats",
2927 Some(exptime),
2928 &keys.iter().map(|x| x.as_ref()).collect::<Vec<&[u8]>>(),
2929 ));
2930 self
2931 }
2932
2933 pub fn stats(mut self, arg: StatsArg) -> Self {
2946 self.1.push(build_stats_cmd(arg));
2947 self
2948 }
2949
2950 pub fn slabs_automove(mut self, arg: SlabsAutomoveArg) -> Self {
2963 self.1.push(build_slabs_automove_cmd(arg));
2964 self
2965 }
2966
2967 pub fn lru_crawler(mut self, arg: LruCrawlerArg) -> Self {
2980 self.1.push(build_lru_crawler_cmd(arg).to_vec());
2981 self
2982 }
2983
2984 pub fn lru_crawler_sleep(mut self, microseconds: usize) -> Self {
2997 self.1.push(build_lru_clawler_sleep_cmd(microseconds));
2998 self
2999 }
3000
3001 pub fn lru_crawler_tocrawl(mut self, arg: u32) -> Self {
3014 self.1.push(build_lru_crawler_tocrawl_cmd(arg));
3015 self
3016 }
3017
3018 pub fn lru_crawler_crawl(mut self, arg: LruCrawlerCrawlArg<'_>) -> Self {
3031 self.1.push(build_lru_clawler_crawl_cmd(arg));
3032 self
3033 }
3034
3035 pub fn slabs_reassign(mut self, source_class: usize, dest_class: usize) -> Self {
3048 self.1
3049 .push(build_slabs_reassign_cmd(source_class, dest_class));
3050 self
3051 }
3052
3053 pub fn lru_crawler_metadump(mut self, arg: LruCrawlerMetadumpArg<'_>) -> Self {
3066 self.1.push(build_lru_clawler_metadump_cmd(arg));
3067 self
3068 }
3069
3070 pub fn lru_crawler_mgdump(mut self, arg: LruCrawlerMgdumpArg<'_>) -> Self {
3083 self.1.push(build_lru_clawler_mgdump_cmd(arg));
3084 self
3085 }
3086
3087 pub fn mn(mut self) -> Self {
3100 self.1.push(build_mn_cmd().to_vec());
3101 self
3102 }
3103
3104 pub fn me(mut self, key: impl AsRef<[u8]>) -> Self {
3117 self.1.push(build_me_cmd(key.as_ref()));
3118 self
3119 }
3120}
3121
3122#[cfg(test)]
3123mod tests {
3124 use super::*;
3125 use smol::{block_on, io::Cursor};
3126
3127 #[test]
3128 fn test_version() {
3129 block_on(async {
3130 let mut c = Cursor::new(b"version\r\nVERSION 1.2.3\r\n".to_vec());
3131 assert_eq!("1.2.3", version_cmd(&mut c).await.unwrap());
3132
3133 let mut c = Cursor::new(b"version\r\nERROR\r\n".to_vec());
3134 assert!(version_cmd(&mut c).await.is_err())
3135 })
3136 }
3137
3138 #[test]
3139 fn test_quit() {
3140 block_on(async {
3141 let mut c = Cursor::new(b"quit\r\n".to_vec());
3142 assert!(quit_cmd(&mut c).await.is_ok())
3143 })
3144 }
3145
3146 #[test]
3147 fn test_shutdown() {
3148 block_on(async {
3149 let mut c = Cursor::new(b"shutdown\r\n".to_vec());
3150 assert!(shutdown_cmd(&mut c, false).await.is_ok());
3151
3152 let mut c = Cursor::new(b"shutdown graceful\r\n".to_vec());
3153 assert!(shutdown_cmd(&mut c, true).await.is_ok())
3154 })
3155 }
3156
3157 #[test]
3158 fn test_cache_memlimit() {
3159 block_on(async {
3160 let mut c = Cursor::new(b"cache_memlimit 1\r\nOK\r\n".to_vec());
3161 assert!(cache_memlimit_cmd(&mut c, 1, false).await.is_ok());
3162
3163 let mut c = Cursor::new(b"cache_memlimit 1 noreply\r\n".to_vec());
3164 assert!(cache_memlimit_cmd(&mut c, 1, true).await.is_ok());
3165
3166 let mut c = Cursor::new(b"cache_memlimit 1\r\nERROR\r\n".to_vec());
3167 assert!(cache_memlimit_cmd(&mut c, 1, false).await.is_err());
3168 })
3169 }
3170
3171 #[test]
3172 fn test_flush_all() {
3173 block_on(async {
3174 let mut c = Cursor::new(b"flush_all\r\nOK\r\n".to_vec());
3175 assert!(flush_all_cmd(&mut c, None, false).await.is_ok());
3176
3177 let mut c = Cursor::new(b"flush_all 1 noreply\r\n".to_vec());
3178 assert!(flush_all_cmd(&mut c, Some(1), true).await.is_ok());
3179
3180 let mut c = Cursor::new(b"flush_all\r\nERROR\r\n".to_vec());
3181 assert!(flush_all_cmd(&mut c, None, false).await.is_err());
3182 })
3183 }
3184
3185 #[test]
3186 fn test_storage() {
3187 block_on(async {
3188 let mut c = Cursor::new(b"cas key 0 0 0 0\r\nvalue\r\nSTORED\r\n".to_vec());
3189 assert!(
3190 storage_cmd(&mut c, b"cas", b"key", 0, 0, Some(0), false, b"value")
3191 .await
3192 .unwrap()
3193 );
3194
3195 let mut c = Cursor::new(b"append key 0 0 0 noreply\r\nvalue\r\n".to_vec());
3196 assert!(
3197 storage_cmd(&mut c, b"append", b"key", 0, 0, None, true, b"value")
3198 .await
3199 .unwrap()
3200 );
3201
3202 let mut c = Cursor::new(b"prepend key 0 0 0\r\nvalue\r\nNOT_STORED\r\n".to_vec());
3203 assert!(
3204 !storage_cmd(&mut c, b"prepend", b"key", 0, 0, None, false, b"value")
3205 .await
3206 .unwrap()
3207 );
3208
3209 let mut c = Cursor::new(b"add key 0 0 0\r\nvalue\r\nERROR\r\n".to_vec());
3210 assert!(
3211 storage_cmd(&mut c, b"add", b"key", 0, 0, None, false, b"value")
3212 .await
3213 .is_err()
3214 )
3215 })
3216 }
3217
3218 #[test]
3219 fn test_delete() {
3220 block_on(async {
3221 let mut c = Cursor::new(b"delete key\r\nDELETED\r\n".to_vec());
3222 assert!(delete_cmd(&mut c, b"key", false).await.unwrap());
3223
3224 let mut c = Cursor::new(b"delete key\r\nNOT_FOUND\r\n".to_vec());
3225 assert!(!delete_cmd(&mut c, b"key", false).await.unwrap());
3226
3227 let mut c = Cursor::new(b"delete key noreply\r\n".to_vec());
3228 assert!(delete_cmd(&mut c, b"key", true).await.unwrap());
3229
3230 let mut c = Cursor::new(b"delete key\r\nERROR\r\n".to_vec());
3231 assert!(delete_cmd(&mut c, b"key", false).await.is_err());
3232 })
3233 }
3234
3235 #[test]
3236 fn test_auth() {
3237 block_on(async {
3238 let mut c = Cursor::new(b"set _ _ _ 3\r\na b\r\nSTORED\r\n".to_vec());
3239 assert!(auth_cmd(&mut c, b"a", b"b").await.is_ok());
3240
3241 let mut c = Cursor::new(b"set _ _ _ 3\r\na b\r\nERROR\r\n".to_vec());
3242 assert!(auth_cmd(&mut c, b"a", b"b").await.is_err());
3243 })
3244 }
3245
3246 #[test]
3247 fn test_incr_decr() {
3248 block_on(async {
3249 let mut c = Cursor::new(b"incr key 1\r\n2\r\n".to_vec());
3250 assert_eq!(
3251 incr_decr_cmd(&mut c, b"incr", b"key", 1, false)
3252 .await
3253 .unwrap(),
3254 Some(2)
3255 );
3256
3257 let mut c = Cursor::new(b"incr key 1 noreply\r\n".to_vec());
3258 assert!(
3259 incr_decr_cmd(&mut c, b"incr", b"key", 1, true)
3260 .await
3261 .unwrap()
3262 .is_none(),
3263 );
3264
3265 let mut c = Cursor::new(b"incr key 1\r\nNOT_FOUND\r\n".to_vec());
3266 assert!(
3267 incr_decr_cmd(&mut c, b"incr", b"key", 1, false)
3268 .await
3269 .unwrap()
3270 .is_none()
3271 );
3272
3273 let mut c = Cursor::new(b"incr key 1\r\nERROR\r\n".to_vec());
3274 assert!(
3275 incr_decr_cmd(&mut c, b"incr", b"key", 1, false)
3276 .await
3277 .is_err()
3278 );
3279 })
3280 }
3281
3282 #[test]
3283 fn test_touch() {
3284 block_on(async {
3285 let mut c = Cursor::new(b"touch key 0\r\nTOUCHED\r\n".to_vec());
3286 assert!(touch_cmd(&mut c, b"key", 0, false).await.unwrap());
3287
3288 let mut c = Cursor::new(b"touch key 0\r\nNOT_FOUND\r\n".to_vec());
3289 assert!(!touch_cmd(&mut c, b"key", 0, false).await.unwrap());
3290
3291 let mut c = Cursor::new(b"touch key 0 noreply\r\n".to_vec());
3292 assert!(touch_cmd(&mut c, b"key", 0, true).await.unwrap());
3293
3294 let mut c = Cursor::new(b"touch key 0\r\nERROR\r\n".to_vec());
3295 assert!(touch_cmd(&mut c, b"key", 0, false).await.is_err())
3296 })
3297 }
3298
3299 #[test]
3300 fn test_retrieval() {
3301 block_on(async {
3302 let mut c = Cursor::new(b"gets key\r\nEND\r\n".to_vec());
3303 assert_eq!(
3304 retrieval_cmd(&mut c, b"gets", None, &[b"key"])
3305 .await
3306 .unwrap(),
3307 vec![]
3308 );
3309
3310 let mut c = Cursor::new(b"gat 0 key\r\nVALUE key 0 1\r\na\r\nEND\r\n".to_vec());
3311 assert_eq!(
3312 retrieval_cmd(&mut c, b"gat", Some(0), &[b"key"])
3313 .await
3314 .unwrap(),
3315 vec![Item {
3316 key: "key".to_string(),
3317 flags: 0,
3318 cas_unique: None,
3319 data_block: b"a".to_vec(),
3320 }]
3321 );
3322
3323 let mut c = Cursor::new(
3324 b"gats 0 key key2\r\nVALUE key 0 1 0\r\na\r\nVALUE key2 0 1 0\r\na\r\nEND\r\n"
3325 .to_vec(),
3326 );
3327 assert_eq!(
3328 retrieval_cmd(&mut c, b"gats", Some(0), &[b"key", b"key2"])
3329 .await
3330 .unwrap(),
3331 vec![
3332 Item {
3333 key: "key".to_string(),
3334 flags: 0,
3335 cas_unique: Some(0),
3336 data_block: b"a".to_vec()
3337 },
3338 Item {
3339 key: "key2".to_string(),
3340 flags: 0,
3341 cas_unique: Some(0),
3342 data_block: b"a".to_vec()
3343 }
3344 ]
3345 );
3346
3347 let mut c = Cursor::new(b"get key\r\nERROR\r\n".to_vec());
3348 assert!(
3349 retrieval_cmd(&mut c, b"get", None, &[b"key"])
3350 .await
3351 .is_err()
3352 )
3353 })
3354 }
3355
3356 #[test]
3357 fn test_stats() {
3358 block_on(async {
3359 let mut c =
3360 Cursor::new(b"stats\r\nSTAT version 1.2.3\r\nSTAT threads 4\r\nEND\r\n".to_vec());
3361 assert_eq!(
3362 stats_cmd(&mut c, StatsArg::Empty).await.unwrap(),
3363 HashMap::from([
3364 ("version".to_string(), "1.2.3".to_string()),
3365 ("threads".to_string(), "4".to_string()),
3366 ])
3367 );
3368
3369 let mut c = Cursor::new(b"stats settings\r\nERROR\r\n".to_vec());
3370 assert!(stats_cmd(&mut c, StatsArg::Settings).await.is_err());
3371
3372 let mut c = Cursor::new(b"stats items\r\nERROR\r\n".to_vec());
3373 assert!(stats_cmd(&mut c, StatsArg::Items).await.is_err());
3374
3375 let mut c = Cursor::new(b"stats sizes\r\nERROR\r\n".to_vec());
3376 assert!(stats_cmd(&mut c, StatsArg::Sizes).await.is_err());
3377
3378 let mut c = Cursor::new(b"stats slabs\r\nERROR\r\n".to_vec());
3379 assert!(stats_cmd(&mut c, StatsArg::Slabs).await.is_err());
3380
3381 let mut c = Cursor::new(b"stats conns\r\nERROR\r\n".to_vec());
3382 assert!(stats_cmd(&mut c, StatsArg::Conns).await.is_err())
3383 })
3384 }
3385
3386 #[test]
3387 fn test_slabs_automove() {
3388 block_on(async {
3389 let mut c = Cursor::new(b"slabs automove 0\r\nOK\r\n".to_vec());
3390 assert!(
3391 slabs_automove_cmd(&mut c, SlabsAutomoveArg::Zero)
3392 .await
3393 .is_ok()
3394 );
3395
3396 let mut c = Cursor::new(b"slabs automove 1\r\nERROR\r\n".to_vec());
3397 assert!(
3398 slabs_automove_cmd(&mut c, SlabsAutomoveArg::One)
3399 .await
3400 .is_err()
3401 );
3402
3403 let mut c = Cursor::new(b"slabs automove 2\r\nERROR\r\n".to_vec());
3404 assert!(
3405 slabs_automove_cmd(&mut c, SlabsAutomoveArg::Two)
3406 .await
3407 .is_err()
3408 )
3409 })
3410 }
3411
3412 #[test]
3413 fn test_lru_crawler() {
3414 block_on(async {
3415 let mut c = Cursor::new(b"lru_crawler enable\r\nOK\r\n".to_vec());
3416 assert!(lru_crawler_cmd(&mut c, LruCrawlerArg::Enable).await.is_ok());
3417
3418 let mut c = Cursor::new(b"lru_crawler disable\r\nERROR\r\n".to_vec());
3419 assert!(
3420 lru_crawler_cmd(&mut c, LruCrawlerArg::Disable)
3421 .await
3422 .is_err()
3423 )
3424 })
3425 }
3426
3427 #[test]
3428 fn test_lru_crawler_sleep() {
3429 block_on(async {
3430 let mut c = Cursor::new(b"lru_crawler sleep 1000000\r\nOK\r\n".to_vec());
3431 assert!(lru_crawler_sleep_cmd(&mut c, 1_000_000).await.is_ok());
3432
3433 let mut c = Cursor::new(b"lru_crawler sleep 0\r\nERROR\r\n".to_vec());
3434 assert!(lru_crawler_sleep_cmd(&mut c, 0).await.is_err())
3435 })
3436 }
3437
3438 #[test]
3439 fn test_lru_crawler_tocrawl() {
3440 block_on(async {
3441 let mut c = Cursor::new(b"lru_crawler tocrawl 0\r\nOK\r\n".to_vec());
3442 assert!(lru_crawler_tocrawl_cmd(&mut c, 0).await.is_ok());
3443
3444 let mut c = Cursor::new(b"lru_crawler tocrawl 0\r\nERROR\r\n".to_vec());
3445 assert!(lru_crawler_tocrawl_cmd(&mut c, 0).await.is_err())
3446 })
3447 }
3448
3449 #[test]
3450 fn test_lru_crawler_crawl() {
3451 block_on(async {
3452 let mut c = Cursor::new(b"lru_crawler crawl 1,2,3\r\nOK\r\n".to_vec());
3453 assert!(
3454 lru_crawler_crawl_cmd(&mut c, LruCrawlerCrawlArg::Classids(&[1, 2, 3]))
3455 .await
3456 .is_ok()
3457 );
3458
3459 let mut c = Cursor::new(b"lru_crawler crawl all\r\nERROR\r\n".to_vec());
3460 assert!(
3461 lru_crawler_crawl_cmd(&mut c, LruCrawlerCrawlArg::All)
3462 .await
3463 .is_err()
3464 )
3465 })
3466 }
3467
3468 #[test]
3469 fn test_slabs_reassign() {
3470 block_on(async {
3471 let mut c = Cursor::new(b"slabs reassign 1 10\r\nOK\r\n".to_vec());
3472 assert!(slabs_reassign_cmd(&mut c, 1, 10).await.is_ok());
3473
3474 let mut c = Cursor::new(b"slabs reassign 1 10\r\nERROR\r\n".to_vec());
3475 assert!(slabs_reassign_cmd(&mut c, 1, 10).await.is_err())
3476 })
3477 }
3478
3479 #[test]
3480 fn test_lru_crawler_metadump() {
3481 block_on(async {
3482 let mut c = Cursor::new(b"lru_crawler metadump all\r\nkey=key exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0\r\nkey=key2 exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0\r\nEND\r\n".to_vec());
3483 assert_eq!(
3484 lru_crawler_metadump_cmd(&mut c, LruCrawlerMetadumpArg::All)
3485 .await
3486 .unwrap(),
3487 [
3488 "key=key exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0",
3489 "key=key2 exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0"
3490 ]
3491 );
3492
3493 let mut c = Cursor::new(b"lru_crawler metadump 1,2,3\r\nERROR\r\n".to_vec());
3494 assert!(
3495 lru_crawler_metadump_cmd(&mut c, LruCrawlerMetadumpArg::Classids(&[1, 2, 3]))
3496 .await
3497 .is_err()
3498 );
3499
3500 let mut c = Cursor::new(b"lru_crawler metadump hash\r\nERROR\r\n".to_vec());
3501 assert!(
3502 lru_crawler_metadump_cmd(&mut c, LruCrawlerMetadumpArg::Hash)
3503 .await
3504 .is_err()
3505 )
3506 })
3507 }
3508
3509 #[test]
3510 fn test_lru_crawler_mgdump() {
3511 block_on(async {
3512 let mut c =
3513 Cursor::new(b"lru_crawler mgdump 3\r\nmg key\r\nmg key2\r\nEN\r\n".to_vec());
3514 assert_eq!(
3515 lru_crawler_mgdump_cmd(&mut c, LruCrawlerMgdumpArg::Classids(&[3]))
3516 .await
3517 .unwrap(),
3518 ["key", "key2"]
3519 );
3520
3521 let mut c = Cursor::new(b"lru_crawler mgdump all\r\nERROR\r\n".to_vec());
3522 assert!(
3523 lru_crawler_mgdump_cmd(&mut c, LruCrawlerMgdumpArg::All)
3524 .await
3525 .is_err()
3526 );
3527
3528 let mut c = Cursor::new(b"lru_crawler mgdump hash\r\nERROR\r\n".to_vec());
3529 assert!(
3530 lru_crawler_mgdump_cmd(&mut c, LruCrawlerMgdumpArg::Hash)
3531 .await
3532 .is_err()
3533 )
3534 })
3535 }
3536
3537 #[test]
3538 fn test_mn() {
3539 block_on(async {
3540 let mut c = Cursor::new(b"mn\r\nMN\r\n".to_vec());
3541 assert!(mn_cmd(&mut c).await.is_ok());
3542
3543 let mut c = Cursor::new(b"mn\r\nERROR\r\n".to_vec());
3544 assert!(mn_cmd(&mut c).await.is_err())
3545 })
3546 }
3547
3548 #[test]
3549 fn test_me() {
3550 block_on(async {
3551 let mut c = Cursor::new(b"me key\r\nEN\r\n".to_vec());
3552 assert!(me_cmd(&mut c, b"key").await.unwrap().is_none());
3553
3554 let mut c = Cursor::new(
3555 b"me key\r\nME key exp=-1 la=3 cas=2 fetch=no cls=1 size=63\r\n".to_vec(),
3556 );
3557 assert_eq!(
3558 me_cmd(&mut c, b"key").await.unwrap().unwrap(),
3559 "key exp=-1 la=3 cas=2 fetch=no cls=1 size=63"
3560 );
3561
3562 let mut c = Cursor::new(b"me key\r\nERROR\r\n".to_vec());
3563 assert!(me_cmd(&mut c, b"key").await.is_err());
3564 })
3565 }
3566
3567 #[test]
3568 fn test_pipeline() {
3569 block_on(async {
3570 let cmds = [
3571 b"version\r\n".to_vec(),
3572 b"quit\r\n".to_vec(),
3573 b"shutdown\r\n".to_vec(),
3574 b"cache_memlimit 1\r\n".to_vec(),
3575 b"cache_memlimit 1 noreply\r\n".to_vec(),
3576 b"flush_all\r\n".to_vec(),
3577 b"flush_all 1 noreply\r\n".to_vec(),
3578 b"cas key 0 0 5 0\r\nvalue\r\n".to_vec(),
3579 b"append key 0 0 5 noreply\r\nvalue\r\n".to_vec(),
3580 b"delete key\r\n".to_vec(),
3581 b"delete key noreply\r\n".to_vec(),
3582 b"set _ _ _ 3\r\na b\r\n".to_vec(),
3583 b"incr key 1\r\n".to_vec(),
3584 b"incr key 1 noreply\r\n".to_vec(),
3585 b"touch key 0\r\n".to_vec(),
3586 b"touch key 0 noreply\r\n".to_vec(),
3587 b"gets key\r\n".to_vec(),
3588 b"get key key2\r\n".to_vec(),
3589 b"gat 0 key key2\r\n".to_vec(),
3590 b"gats 0 key\r\n".to_vec(),
3591 b"stats\r\n".to_vec(),
3592 b"slabs automove 0\r\n".to_vec(),
3593 b"lru_crawler enable\r\n".to_vec(),
3594 b"lru_crawler disable\r\n".to_vec(),
3595 b"lru_crawler sleep 1000000\r\n".to_vec(),
3596 b"lru_crawler tocrawl 0\r\n".to_vec(),
3597 b"lru_crawler crawl 1,2,3\r\n".to_vec(),
3598 b"slabs reassign 1 10\r\n".to_vec(),
3599 b"lru_crawler metadump all\r\n".to_vec(),
3600 b"lru_crawler mgdump 3\r\n".to_vec(),
3601 b"mn\r\n".to_vec(),
3602 b"me key\r\n".to_vec(),
3603 ];
3604 let rps = [
3605 b"VERSION 1.2.3\r\n".to_vec(),
3606 b"OK\r\n".to_vec(),
3607 b"OK\r\n".to_vec(),
3608 b"STORED\r\n".to_vec(),
3609 b"DELETED\r\n".to_vec(),
3610 b"STORED\r\n".to_vec(),
3611 b"2\r\n".to_vec(),
3612 b"TOUCHED\r\n".to_vec(),
3613 b"END\r\n".to_vec(),
3614 b"END\r\n".to_vec(),
3615 b"VALUE key 0 1 0\r\na\r\nVALUE key2 0 1 0\r\na\r\nEND\r\n".to_vec(),
3616 b"VALUE key 0 1 0\r\na\r\nEND\r\n".to_vec(),
3617 b"STAT version 1.2.3\r\nSTAT threads 4\r\nEND\r\n".to_vec(),
3618 b"OK\r\n".to_vec(),
3619 b"OK\r\n".to_vec(),
3620 b"OK\r\n".to_vec(),
3621 b"OK\r\n".to_vec(),
3622 b"OK\r\n".to_vec(),
3623 b"OK\r\n".to_vec(),
3624 b"OK\r\n".to_vec(),
3625 b"key=key exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0\r\nkey=key2 exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0\r\nEND\r\n".to_vec(),
3626 b"mg key\r\nmg key2\r\nEN\r\n".to_vec(),
3627 b"MN\r\n".to_vec(),
3628 b"ME key exp=-1 la=3 cas=2 fetch=no cls=1 size=63\r\n".to_vec(),
3629 ];
3630 let mut c = Cursor::new([cmds.concat(), rps.concat()].concat().to_vec());
3631 assert_eq!(
3632 execute_cmd(&mut c, &cmds).await.unwrap(),
3633 [
3634 PipelineResponse::String("1.2.3".to_string()),
3635 PipelineResponse::Unit(()),
3636 PipelineResponse::Unit(()),
3637 PipelineResponse::Unit(()),
3638 PipelineResponse::Unit(()),
3639 PipelineResponse::Unit(()),
3640 PipelineResponse::Unit(()),
3641 PipelineResponse::Bool(true),
3642 PipelineResponse::Bool(true),
3643 PipelineResponse::Bool(true),
3644 PipelineResponse::Bool(true),
3645 PipelineResponse::Unit(()),
3646 PipelineResponse::Value(Some(2)),
3647 PipelineResponse::Value(None),
3648 PipelineResponse::Bool(true),
3649 PipelineResponse::Bool(true),
3650 PipelineResponse::Item(Item {
3651 key: "key".to_string(),
3652 flags: 0,
3653 cas_unique: Some(0),
3654 data_block: b"a".to_vec()
3655 }),
3656 PipelineResponse::Item(Item {
3657 key: "key2".to_string(),
3658 flags: 0,
3659 cas_unique: Some(0),
3660 data_block: b"a".to_vec()
3661 }),
3662 PipelineResponse::Item(Item {
3663 key: "key".to_string(),
3664 flags: 0,
3665 cas_unique: Some(0),
3666 data_block: b"a".to_vec()
3667 }),
3668 PipelineResponse::HashMap(HashMap::from([
3669 ("threads".to_string(), "4".to_string()),
3670 ("version".to_string(), "1.2.3".to_string())
3671 ])),
3672 PipelineResponse::Unit(()),
3673 PipelineResponse::Unit(()),
3674 PipelineResponse::Unit(()),
3675 PipelineResponse::Unit(()),
3676 PipelineResponse::Unit(()),
3677 PipelineResponse::Unit(()),
3678 PipelineResponse::Unit(()),
3679 PipelineResponse::VecString(vec![
3680 "key=key exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0"
3681 .to_string(),
3682 "key=key2 exp=-1 la=1745299782 cas=2 fetch=no cls=1 size=63 flags=0"
3683 .to_string()
3684 ]),
3685 PipelineResponse::VecString(vec!["key".to_string(), "key2".to_string()]),
3686 PipelineResponse::Unit(()),
3687 PipelineResponse::OptionString(Some(
3688 "key exp=-1 la=3 cas=2 fetch=no cls=1 size=63".to_string()
3689 ))
3690 ]
3691 );
3692
3693 let cmds = [b"version\r\n".to_vec(), b"quit\r\n".to_vec()];
3694 let rps = [b"ERROR\r\n".to_vec(), b"OK\r\n".to_vec()];
3695 let mut c = Cursor::new([cmds.concat(), rps.concat()].concat().to_vec());
3696 assert!(execute_cmd(&mut c, &cmds).await.is_err());
3697 })
3698 }
3699}