1use core::fmt::Display;
4#[cfg(feature = "with-futures")]
5use futures::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
6
7#[cfg(feature = "with-tokio")]
8use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
9
10use std::collections::HashMap;
11use std::io::{Error, ErrorKind};
12use std::marker::Unpin;
13
14pub struct Protocol<S> {
16 io: BufReader<S>,
17 buf: Vec<u8>,
18}
19
20impl<S> Protocol<S>
21where
22 S: AsyncRead + AsyncWrite + Unpin,
23{
24 pub fn new(io: S) -> Self {
26 Self {
27 io: BufReader::new(io),
28 buf: Vec::new(),
29 }
30 }
31
32 pub async fn get<K: AsRef<[u8]>>(&mut self, key: K) -> Result<Vec<u8>, Error> {
34 let writer = self.io.get_mut();
36 writer
37 .write_all(&[b"get ", key.as_ref(), b"\r\n"].concat())
38 .await?;
39 writer.flush().await?;
40
41 let (val, _) = self.read_get_response().await?;
42 Ok(val)
43 }
44
45 async fn read_get_response(&mut self) -> Result<(Vec<u8>, Option<u64>), Error> {
46 let header = self.read_line().await?;
48 let header = std::str::from_utf8(header).map_err(|_| ErrorKind::InvalidData)?;
49
50 if header.starts_with("ERROR")
52 || header.starts_with("CLIENT_ERROR")
53 || header.starts_with("SERVER_ERROR")
54 {
55 return Err(Error::new(ErrorKind::Other, header));
56 } else if header.starts_with("END") {
57 return Err(ErrorKind::NotFound.into());
58 }
59
60 let mut parts = header.split(' ');
62 let length: usize = parts
63 .nth(3)
64 .and_then(|len| len.trim_end().parse().ok())
65 .ok_or(ErrorKind::InvalidData)?;
66
67 let cas: Option<u64> = parts.next().and_then(|len| len.trim_end().parse().ok());
69
70 let mut buffer: Vec<u8> = vec![0; length];
72 self.io.read_exact(&mut buffer).await?;
73
74 self.read_line().await?; self.read_line().await?; Ok((buffer, cas))
79 }
80
81 pub async fn get_multi<K: AsRef<[u8]>>(
84 &mut self,
85 keys: &[K],
86 ) -> Result<HashMap<String, Vec<u8>>, Error> {
87 if keys.is_empty() {
88 return Ok(HashMap::new());
89 }
90
91 let writer = self.io.get_mut();
93 writer.write_all("get".as_bytes()).await?;
94 for k in keys {
95 writer.write_all(b" ").await?;
96 writer.write_all(k.as_ref()).await?;
97 }
98 writer.write_all(b"\r\n").await?;
99 writer.flush().await?;
100
101 self.read_many_values().await
103 }
104
105 async fn read_many_values(&mut self) -> Result<HashMap<String, Vec<u8>>, Error> {
106 let mut map = HashMap::new();
107 loop {
108 let header = {
109 let buf = self.read_line().await?;
110 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
111 }
112 .to_string();
113 let mut parts = header.split(' ');
114 match parts.next() {
115 Some("VALUE") => {
116 if let (Some(key), _flags, Some(size_str)) =
117 (parts.next(), parts.next(), parts.next())
118 {
119 let size: usize = size_str
120 .trim_end()
121 .parse()
122 .map_err(|_| Error::from(ErrorKind::InvalidData))?;
123 let mut buffer: Vec<u8> = vec![0; size];
124 self.io.read_exact(&mut buffer).await?;
125 let mut crlf = vec![0; 2];
126 self.io.read_exact(&mut crlf).await?;
127
128 map.insert(key.to_owned(), buffer);
129 } else {
130 return Err(Error::new(ErrorKind::InvalidData, header));
131 }
132 }
133 Some("END\r\n") => return Ok(map),
134 Some("ERROR") => return Err(Error::new(ErrorKind::Other, header)),
135 _ => return Err(Error::new(ErrorKind::InvalidData, header)),
136 }
137 }
138 }
139
140 pub async fn get_prefix<K: Display>(
143 &mut self,
144 key_prefix: K,
145 limit: Option<usize>,
146 ) -> Result<HashMap<String, Vec<u8>>, Error> {
147 let header = if let Some(limit) = limit {
149 format!("get_prefix {} {}\r\n", key_prefix, limit)
150 } else {
151 format!("get_prefix {}\r\n", key_prefix)
152 };
153 self.io.write_all(header.as_bytes()).await?;
154 self.io.flush().await?;
155
156 self.read_many_values().await
158 }
159
160 pub async fn add<K: Display>(
162 &mut self,
163 key: K,
164 val: &[u8],
165 expiration: u32,
166 ) -> Result<(), Error> {
167 let header = format!("add {} 0 {} {}\r\n", key, expiration, val.len());
169 self.io.write_all(header.as_bytes()).await?;
170 self.io.write_all(val).await?;
171 self.io.write_all(b"\r\n").await?;
172 self.io.flush().await?;
173
174 let header = {
176 let buf = self.read_line().await?;
177 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
178 };
179
180 if header.contains("ERROR") {
182 return Err(Error::new(ErrorKind::Other, header));
183 } else if header.starts_with("NOT_STORED") {
184 return Err(ErrorKind::AlreadyExists.into());
185 }
186
187 Ok(())
188 }
189
190 pub async fn set<K: Display>(
192 &mut self,
193 key: K,
194 val: &[u8],
195 expiration: u32,
196 ) -> Result<(), Error> {
197 let header = format!("set {} 0 {} {} noreply\r\n", key, expiration, val.len());
198 self.io.write_all(header.as_bytes()).await?;
199 self.io.write_all(val).await?;
200 self.io.write_all(b"\r\n").await?;
201 self.io.flush().await?;
202 Ok(())
203 }
204
205 pub async fn append<K: Display>(&mut self, key: K, val: &[u8]) -> Result<(), Error> {
207 let header = format!("append {} 0 0 {} noreply\r\n", key, val.len());
208 self.io.write_all(header.as_bytes()).await?;
209 self.io.write_all(val).await?;
210 self.io.write_all(b"\r\n").await?;
211 self.io.flush().await?;
212 Ok(())
213 }
214
215 pub async fn delete<K: Display>(&mut self, key: K) -> Result<(), Error> {
217 let header = format!("delete {} noreply\r\n", key);
218 self.io.write_all(header.as_bytes()).await?;
219 self.io.flush().await?;
220 Ok(())
221 }
222
223 pub async fn version(&mut self) -> Result<String, Error> {
225 self.io.write_all(b"version\r\n").await?;
226 self.io.flush().await?;
227
228 let header = {
230 let buf = self.read_line().await?;
231 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
232 };
233
234 if !header.starts_with("VERSION") {
235 return Err(Error::new(ErrorKind::Other, header));
236 }
237 let version = header.trim_start_matches("VERSION ").trim_end();
238 Ok(version.to_string())
239 }
240
241 pub async fn flush(&mut self) -> Result<(), Error> {
243 self.io.write_all(b"flush_all\r\n").await?;
244 self.io.flush().await?;
245
246 let header = {
248 let buf = self.read_line().await?;
249 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
250 };
251
252 if header == "OK\r\n" {
253 Ok(())
254 } else {
255 Err(Error::new(ErrorKind::Other, header))
256 }
257 }
258
259 pub async fn increment<K: AsRef<[u8]>>(&mut self, key: K, amount: u64) -> Result<u64, Error> {
262 let writer = self.io.get_mut();
264 let buf = &[
265 b"incr ",
266 key.as_ref(),
267 b" ",
268 amount.to_string().as_bytes(),
269 b"\r\n",
270 ]
271 .concat();
272 writer.write_all(buf).await?;
273 writer.flush().await?;
274
275 let header = {
277 let buf = self.read_line().await?;
278 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
279 };
280
281 if header == "NOT_FOUND\r\n" {
282 Err(ErrorKind::NotFound.into())
283 } else {
284 let value = header
285 .trim_end()
286 .parse::<u64>()
287 .map_err(|_| Error::from(ErrorKind::InvalidData))?;
288 Ok(value)
289 }
290 }
291
292 pub async fn increment_noreply<K: AsRef<[u8]>>(
294 &mut self,
295 key: K,
296 amount: u64,
297 ) -> Result<(), Error> {
298 let writer = self.io.get_mut();
299 let buf = &[
300 b"incr ",
301 key.as_ref(),
302 b" ",
303 amount.to_string().as_bytes(),
304 b" noreply\r\n",
305 ]
306 .concat();
307 writer.write_all(buf).await?;
308 writer.flush().await?;
309 Ok(())
310 }
311
312 pub async fn decrement<K: AsRef<[u8]>>(&mut self, key: K, amount: u64) -> Result<u64, Error> {
315 let writer = self.io.get_mut();
317 let buf = &[
318 b"decr ",
319 key.as_ref(),
320 b" ",
321 amount.to_string().as_bytes(),
322 b"\r\n",
323 ]
324 .concat();
325 writer.write_all(buf).await?;
326 writer.flush().await?;
327
328 let header = {
330 let buf = self.read_line().await?;
331 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
332 };
333
334 if header == "NOT_FOUND\r\n" {
335 Err(ErrorKind::NotFound.into())
336 } else {
337 let value = header
338 .trim_end()
339 .parse::<u64>()
340 .map_err(|_| Error::from(ErrorKind::InvalidData))?;
341 Ok(value)
342 }
343 }
344
345 async fn read_line(&mut self) -> Result<&[u8], Error> {
346 let Self { io, buf } = self;
347 buf.clear();
348 io.read_until(b'\n', buf).await?;
349 if buf.last().copied() != Some(b'\n') {
350 return Err(ErrorKind::UnexpectedEof.into());
351 }
352 Ok(&buf[..])
353 }
354
355 pub async fn gets_cas<K: AsRef<[u8]>>(&mut self, key: K) -> Result<(Vec<u8>, u64), Error> {
357 let writer = self.io.get_mut();
359 writer
360 .write_all(&[b"gets ", key.as_ref(), b"\r\n"].concat())
361 .await?;
362 writer.flush().await?;
363
364 let (val, maybe_cas) = self.read_get_response().await?;
365 let cas = maybe_cas.ok_or(ErrorKind::InvalidData)?;
366
367 Ok((val, cas))
368 }
369
370 pub async fn cas<K: Display>(
375 &mut self,
376 key: K,
377 val: &[u8],
378 cas_id: u64,
379 expiration: u32,
380 ) -> Result<bool, Error> {
381 let header = format!("cas {} 0 {} {} {}\r\n", key, expiration, val.len(), cas_id);
382 self.io.write_all(header.as_bytes()).await?;
383 self.io.write_all(val).await?;
384 self.io.write_all(b"\r\n").await?;
385 self.io.flush().await?;
386
387 let header = {
389 let buf = self.read_line().await?;
390 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
391 };
392
393 if header.starts_with("STORED") {
411 Ok(true)
412 } else if header.starts_with("EXISTS") || header.starts_with("NOT_STORED") {
413 Ok(false)
414 } else if header.starts_with("NOT FOUND") {
415 Err(ErrorKind::NotFound.into())
416 } else {
417 Err(Error::new(ErrorKind::Other, header))
418 }
419 }
420
421 pub async fn append_or_vivify<K: Display>(
423 &mut self,
424 key: K,
425 val: &[u8],
426 ttl: u32,
427 ) -> Result<(), Error> {
428 let header = format!("ms {} {} MA N{}\r\n", key, val.len(), ttl);
441 self.io.write_all(header.as_bytes()).await?;
442 self.io.write_all(val).await?;
443 self.io.write_all(b"\r\n").await?;
444 self.io.flush().await?;
445
446 let header = {
448 let buf = self.read_line().await?;
449 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
450 };
451
452 if header.starts_with("HD") {
473 Ok(())
474 } else {
475 Err(Error::new(ErrorKind::Other, header))
476 }
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use futures::executor::block_on;
483 use futures::io::{AsyncRead, AsyncWrite};
484
485 use std::io::{Cursor, Error, ErrorKind, Read, Write};
486 use std::pin::Pin;
487 use std::task::{Context, Poll};
488
489 struct Cache {
490 r: Cursor<Vec<u8>>,
491 w: Cursor<Vec<u8>>,
492 }
493
494 impl Cache {
495 fn new() -> Self {
496 Cache {
497 r: Cursor::new(Vec::new()),
498 w: Cursor::new(Vec::new()),
499 }
500 }
501 }
502
503 impl AsyncRead for Cache {
504 fn poll_read(
505 self: Pin<&mut Self>,
506 _cx: &mut Context,
507 buf: &mut [u8],
508 ) -> Poll<Result<usize, Error>> {
509 Poll::Ready(self.get_mut().r.read(buf))
510 }
511 }
512
513 impl AsyncWrite for Cache {
514 fn poll_write(
515 self: Pin<&mut Self>,
516 _cx: &mut Context,
517 buf: &[u8],
518 ) -> Poll<Result<usize, Error>> {
519 Poll::Ready(self.get_mut().w.write(buf))
520 }
521
522 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
523 Poll::Ready(self.get_mut().w.flush())
524 }
525
526 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
527 Poll::Ready(Ok(()))
528 }
529 }
530
531 #[test]
532 fn test_ascii_get() {
533 let mut cache = Cache::new();
534 cache
535 .r
536 .get_mut()
537 .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\n");
538 let mut ascii = super::Protocol::new(&mut cache);
539 assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
540 assert_eq!(cache.w.get_ref(), b"get foo\r\n");
541 }
542
543 #[test]
544 fn test_ascii_get2() {
545 let mut cache = Cache::new();
546 cache
547 .r
548 .get_mut()
549 .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\nVALUE bar 0 3\r\nbaz\r\nEND\r\n");
550 let mut ascii = super::Protocol::new(&mut cache);
551 assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
552 assert_eq!(block_on(ascii.get(&"bar")).unwrap(), b"baz");
553 }
554
555 #[test]
556 fn test_ascii_get_cas() {
557 let mut cache = Cache::new();
558 cache
559 .r
560 .get_mut()
561 .extend_from_slice(b"VALUE foo 0 3 99999\r\nbar\r\nEND\r\n");
562 let mut ascii = super::Protocol::new(&mut cache);
563 assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
564 assert_eq!(cache.w.get_ref(), b"get foo\r\n");
565 }
566
567 #[test]
568 fn test_ascii_get_empty() {
569 let mut cache = Cache::new();
570 cache.r.get_mut().extend_from_slice(b"END\r\n");
571 let mut ascii = super::Protocol::new(&mut cache);
572 assert_eq!(
573 block_on(ascii.get(&"foo")).unwrap_err().kind(),
574 ErrorKind::NotFound
575 );
576 assert_eq!(cache.w.get_ref(), b"get foo\r\n");
577 }
578
579 #[test]
580 fn test_ascii_get_eof_error() {
581 let mut cache = Cache::new();
582 cache.r.get_mut().extend_from_slice(b"EN");
583 let mut ascii = super::Protocol::new(&mut cache);
584 assert_eq!(
585 block_on(ascii.get(&"foo")).unwrap_err().kind(),
586 ErrorKind::UnexpectedEof
587 );
588 assert_eq!(cache.w.get_ref(), b"get foo\r\n");
589 }
590
591 #[test]
592 fn test_ascii_get_one() {
593 let mut cache = Cache::new();
594 cache
595 .r
596 .get_mut()
597 .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\n");
598 let mut ascii = super::Protocol::new(&mut cache);
599 let keys = vec!["foo"];
600 let map = block_on(ascii.get_multi(&keys)).unwrap();
601 assert_eq!(map.len(), 1);
602 assert_eq!(map.get("foo").unwrap(), b"bar");
603 assert_eq!(cache.w.get_ref(), b"get foo\r\n");
604 }
605
606 #[test]
607 fn test_ascii_get_many() {
608 let mut cache = Cache::new();
609 cache
610 .r
611 .get_mut()
612 .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nVALUE baz 44 4\r\ncrux\r\nEND\r\n");
613 let mut ascii = super::Protocol::new(&mut cache);
614 let keys = vec!["foo", "baz", "blah"];
615 let map = block_on(ascii.get_multi(&keys)).unwrap();
616 assert_eq!(map.len(), 2);
617 assert_eq!(map.get("foo").unwrap(), b"bar");
618 assert_eq!(map.get("baz").unwrap(), b"crux");
619 assert_eq!(cache.w.get_ref(), b"get foo baz blah\r\n");
620 }
621
622 #[test]
623 fn test_ascii_get_prefix() {
624 let mut cache = Cache::new();
625 cache
626 .r
627 .get_mut()
628 .extend_from_slice(b"VALUE key 0 3\r\nbar\r\nVALUE kez 44 4\r\ncrux\r\nEND\r\n");
629 let mut ascii = super::Protocol::new(&mut cache);
630 let key_prefix = "ke";
631 let map = block_on(ascii.get_prefix(&key_prefix, None)).unwrap();
632 assert_eq!(map.len(), 2);
633 assert_eq!(map.get("key").unwrap(), b"bar");
634 assert_eq!(map.get("kez").unwrap(), b"crux");
635 assert_eq!(cache.w.get_ref(), b"get_prefix ke\r\n");
636 }
637
638 #[test]
639 fn test_ascii_get_multi_empty() {
640 let mut cache = Cache::new();
641 cache.r.get_mut().extend_from_slice(b"END\r\n");
642 let mut ascii = super::Protocol::new(&mut cache);
643 let keys = vec!["foo", "baz"];
644 let map = block_on(ascii.get_multi(&keys)).unwrap();
645 assert!(map.is_empty());
646 assert_eq!(cache.w.get_ref(), b"get foo baz\r\n");
647 }
648
649 #[test]
650 fn test_ascii_get_multi_zero_keys() {
651 let mut cache = Cache::new();
652 cache.r.get_mut().extend_from_slice(b"END\r\n");
653 let mut ascii = super::Protocol::new(&mut cache);
654 let map = block_on(ascii.get_multi::<&str>(&[])).unwrap();
655 assert!(map.is_empty());
656 assert_eq!(cache.w.get_ref(), b"");
657 }
658
659 #[test]
660 fn test_ascii_set() {
661 let (key, val, ttl) = ("foo", "bar", 5);
662 let mut cache = Cache::new();
663 let mut ascii = super::Protocol::new(&mut cache);
664 block_on(ascii.set(&key, val.as_bytes(), ttl)).unwrap();
665 assert_eq!(
666 cache.w.get_ref(),
667 &format!("set {} 0 {} {} noreply\r\n{}\r\n", key, ttl, val.len(), val)
668 .as_bytes()
669 .to_vec()
670 );
671 }
672
673 #[test]
674 fn test_ascii_increment_noreply() {
675 let (key, amount) = ("foo", 5);
676 let mut cache = Cache::new();
677 let mut ascii = super::Protocol::new(&mut cache);
678 block_on(ascii.increment_noreply(key.as_bytes(), amount)).unwrap();
679 assert_eq!(
680 cache.w.get_ref(),
681 &format!("incr {} {} noreply\r\n", key, amount)
682 .as_bytes()
683 .to_vec()
684 );
685 }
686
687 #[test]
688 fn test_ascii_add_new_key() {
689 let (key, val, ttl) = ("foo", "bar", 5);
690 let mut cache = Cache::new();
691 cache.r.get_mut().extend_from_slice(b"STORED\r\n");
692 let mut ascii = super::Protocol::new(&mut cache);
693 block_on(ascii.add(&key, val.as_bytes(), ttl)).unwrap();
694 assert_eq!(
695 cache.w.get_ref(),
696 &format!("add {} 0 {} {}\r\n{}\r\n", key, ttl, val.len(), val)
697 .as_bytes()
698 .to_vec()
699 );
700 }
701
702 #[test]
703 fn test_ascii_add_duplicate() {
704 let (key, val, ttl) = ("foo", "bar", 5);
705 let mut cache = Cache::new();
706 cache.r.get_mut().extend_from_slice(b"NOT_STORED\r\n");
707 let mut ascii = super::Protocol::new(&mut cache);
708 assert_eq!(
709 block_on(ascii.add(&key, val.as_bytes(), ttl))
710 .unwrap_err()
711 .kind(),
712 ErrorKind::AlreadyExists
713 );
714 }
715
716 #[test]
717 fn test_ascii_version() {
718 let mut cache = Cache::new();
719 cache.r.get_mut().extend_from_slice(b"VERSION 1.6.6\r\n");
720 let mut ascii = super::Protocol::new(&mut cache);
721 assert_eq!(block_on(ascii.version()).unwrap(), "1.6.6");
722 assert_eq!(cache.w.get_ref(), b"version\r\n");
723 }
724
725 #[test]
726 fn test_ascii_flush() {
727 let mut cache = Cache::new();
728 cache.r.get_mut().extend_from_slice(b"OK\r\n");
729 let mut ascii = super::Protocol::new(&mut cache);
730 assert!(block_on(ascii.flush()).is_ok());
731 assert_eq!(cache.w.get_ref(), b"flush_all\r\n");
732 }
733
734 #[test]
735 fn test_ascii_increment() {
736 let mut cache = Cache::new();
737 cache.r.get_mut().extend_from_slice(b"2\r\n");
738 let mut ascii = super::Protocol::new(&mut cache);
739 assert_eq!(block_on(ascii.increment("foo", 1)).unwrap(), 2);
740 assert_eq!(cache.w.get_ref(), b"incr foo 1\r\n");
741 }
742
743 #[test]
744 fn test_ascii_decrement() {
745 let mut cache = Cache::new();
746 cache.r.get_mut().extend_from_slice(b"0\r\n");
747 let mut ascii = super::Protocol::new(&mut cache);
748 assert_eq!(block_on(ascii.decrement("foo", 1)).unwrap(), 0);
749 assert_eq!(cache.w.get_ref(), b"decr foo 1\r\n");
750 }
751
752 #[test]
753 fn test_ascii_gets_cas() {
754 let mut cache = Cache::new();
755 cache
756 .r
757 .get_mut()
758 .extend_from_slice(b"VALUE foo 0 3 77\r\nbar\r\nEND\r\n");
759 let mut ascii = super::Protocol::new(&mut cache);
760 assert_eq!(
761 block_on(ascii.gets_cas(&"foo")).unwrap(),
762 (Vec::from(b"bar"), 77)
763 );
764 assert_eq!(cache.w.get_ref(), b"gets foo\r\n");
765 }
766
767 #[test]
768 fn test_ascii_cas() {
769 let (key, val, cas_id, ttl) = ("foo", "bar", 33, 5);
770 let mut cache = Cache::new();
771 cache
772 .r
773 .get_mut()
774 .extend_from_slice(b"STORED\r\nbar\r\nEND\r\n");
775 let mut ascii = super::Protocol::new(&mut cache);
776 block_on(ascii.cas(&key, val.as_bytes(), cas_id, ttl)).unwrap();
777 assert_eq!(
778 cache.w.get_ref(),
779 &format!(
780 "cas {} 0 {} {} {}\r\n{}\r\n",
781 key,
782 ttl,
783 val.len(),
784 cas_id,
785 val
786 )
787 .as_bytes()
788 .to_vec()
789 );
790 }
791
792 #[test]
793 fn test_ascii_append_or_vivify() {
794 let (key, val, ttl) = ("foo", "bar", 5);
795 let mut cache = Cache::new();
796 cache.r.get_mut().extend_from_slice(b"HD\r\nbar\r\nEND\r\n");
797 let mut ascii = super::Protocol::new(&mut cache);
798 block_on(ascii.append_or_vivify(&key, val.as_bytes(), ttl)).unwrap();
799 assert_eq!(
800 cache.w.get_ref(),
801 &format!("ms {} {} MA N{}\r\n{}\r\n", key, val.len(), ttl, val)
802 .as_bytes()
803 .to_vec()
804 );
805 }
806
807 #[test]
808 fn test_response_starting_with_error() {
809 let mut cache = Cache::new();
810 cache.r.get_mut().extend_from_slice(b"ERROR\r\n");
811 let mut ascii = super::Protocol::new(&mut cache);
812 let err = block_on(ascii.get(&"foo")).unwrap_err();
813 assert_eq!(err.kind(), ErrorKind::Other);
814 }
815
816 #[test]
817 fn test_response_containing_error_string() {
818 let mut cache = Cache::new();
819 cache
820 .r
821 .get_mut()
822 .extend_from_slice(b"VALUE contains_ERROR 0 3\r\nbar\r\nEND\r\n");
823 let mut ascii = super::Protocol::new(&mut cache);
824 assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
825 }
826
827 #[test]
828 fn test_client_error_response() {
829 let mut cache = Cache::new();
830 cache
831 .r
832 .get_mut()
833 .extend_from_slice(b"CLIENT_ERROR bad command\r\n");
834 let mut ascii = super::Protocol::new(&mut cache);
835 let err = block_on(ascii.get(&"foo")).unwrap_err();
836 assert_eq!(err.kind(), ErrorKind::Other);
837 }
838
839 #[test]
840 fn test_server_error_response() {
841 let mut cache = Cache::new();
842 cache
843 .r
844 .get_mut()
845 .extend_from_slice(b"SERVER_ERROR out of memory\r\n");
846 let mut ascii = super::Protocol::new(&mut cache);
847 let err = block_on(ascii.get(&"foo")).unwrap_err();
848 assert_eq!(err.kind(), ErrorKind::Other);
849 }
850}