1use core::fmt::Display;
4#[cfg(feature = "with-futures")]
5use futures::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
6
7#[cfg(feature = "with-tokio")]
8use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
9
10use std::collections::HashMap;
11use std::io::{Error, ErrorKind};
12use std::marker::Unpin;
13
14fn is_memcached_error(header: &str) -> bool {
16 header.starts_with("ERROR")
17 || header.starts_with("CLIENT_ERROR")
18 || header.starts_with("SERVER_ERROR")
19}
20
21pub struct Protocol<S> {
23 io: BufReader<S>,
24 buf: Vec<u8>,
25}
26
27impl<S> Protocol<S>
28where
29 S: AsyncRead + AsyncWrite + Unpin,
30{
31 pub fn new(io: S) -> Self {
33 Self {
34 io: BufReader::new(io),
35 buf: Vec::new(),
36 }
37 }
38
39 pub async fn get<K: AsRef<[u8]>>(&mut self, key: K) -> Result<Vec<u8>, Error> {
41 let writer = self.io.get_mut();
43 writer
44 .write_all(&[b"get ", key.as_ref(), b"\r\n"].concat())
45 .await?;
46 writer.flush().await?;
47
48 let (val, _) = self.read_get_response().await?;
49 Ok(val)
50 }
51
52 async fn read_get_response(&mut self) -> Result<(Vec<u8>, Option<u64>), Error> {
53 let header = self.read_line().await?;
55 let header = std::str::from_utf8(header).map_err(|_| ErrorKind::InvalidData)?;
56
57 if is_memcached_error(header) {
59 return Err(Error::new(ErrorKind::Other, header));
60 } else if header.starts_with("END") {
61 return Err(ErrorKind::NotFound.into());
62 }
63
64 let mut parts = header.split(' ');
66 let length: usize = parts
67 .nth(3)
68 .and_then(|len| len.trim_end().parse().ok())
69 .ok_or(ErrorKind::InvalidData)?;
70
71 let cas: Option<u64> = parts.next().and_then(|len| len.trim_end().parse().ok());
73
74 let mut buffer: Vec<u8> = vec![0; length];
76 self.io.read_exact(&mut buffer).await?;
77
78 self.read_line().await?; self.read_line().await?; Ok((buffer, cas))
83 }
84
85 pub async fn get_multi<K: AsRef<[u8]>>(
88 &mut self,
89 keys: &[K],
90 ) -> Result<HashMap<String, Vec<u8>>, Error> {
91 if keys.is_empty() {
92 return Ok(HashMap::new());
93 }
94
95 let writer = self.io.get_mut();
97 writer.write_all("get".as_bytes()).await?;
98 for k in keys {
99 writer.write_all(b" ").await?;
100 writer.write_all(k.as_ref()).await?;
101 }
102 writer.write_all(b"\r\n").await?;
103 writer.flush().await?;
104
105 self.read_many_values().await
107 }
108
109 async fn read_many_values(&mut self) -> Result<HashMap<String, Vec<u8>>, Error> {
110 let mut map = HashMap::new();
111 loop {
112 let header = {
113 let buf = self.read_line().await?;
114 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
115 }
116 .to_string();
117 let mut parts = header.split(' ');
118 match parts.next() {
119 Some("VALUE") => {
120 if let (Some(key), _flags, Some(size_str)) =
121 (parts.next(), parts.next(), parts.next())
122 {
123 let size: usize = size_str
124 .trim_end()
125 .parse()
126 .map_err(|_| Error::from(ErrorKind::InvalidData))?;
127 let mut buffer: Vec<u8> = vec![0; size];
128 self.io.read_exact(&mut buffer).await?;
129 let mut crlf = vec![0; 2];
130 self.io.read_exact(&mut crlf).await?;
131
132 map.insert(key.to_owned(), buffer);
133 } else {
134 return Err(Error::new(ErrorKind::InvalidData, header));
135 }
136 }
137 Some("END\r\n") => return Ok(map),
138 Some("ERROR") => return Err(Error::new(ErrorKind::Other, header)),
139 _ => return Err(Error::new(ErrorKind::InvalidData, header)),
140 }
141 }
142 }
143
144 pub async fn get_prefix<K: Display>(
147 &mut self,
148 key_prefix: K,
149 limit: Option<usize>,
150 ) -> Result<HashMap<String, Vec<u8>>, Error> {
151 let header = if let Some(limit) = limit {
153 format!("get_prefix {} {}\r\n", key_prefix, limit)
154 } else {
155 format!("get_prefix {}\r\n", key_prefix)
156 };
157 self.io.write_all(header.as_bytes()).await?;
158 self.io.flush().await?;
159
160 self.read_many_values().await
162 }
163
164 pub async fn add<K: Display>(
166 &mut self,
167 key: K,
168 val: &[u8],
169 expiration: u32,
170 ) -> Result<(), Error> {
171 let header = format!("add {} 0 {} {}\r\n", key, expiration, val.len());
173 self.io.write_all(header.as_bytes()).await?;
174 self.io.write_all(val).await?;
175 self.io.write_all(b"\r\n").await?;
176 self.io.flush().await?;
177
178 let header = {
180 let buf = self.read_line().await?;
181 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
182 };
183
184 if is_memcached_error(header) {
186 return Err(Error::new(ErrorKind::Other, header));
187 } else if header.starts_with("NOT_STORED") {
188 return Err(ErrorKind::AlreadyExists.into());
189 }
190
191 Ok(())
192 }
193
194 pub async fn set<K: Display>(
196 &mut self,
197 key: K,
198 val: &[u8],
199 expiration: u32,
200 ) -> Result<(), Error> {
201 let header = format!("set {} 0 {} {} noreply\r\n", key, expiration, val.len());
202 self.io.write_all(header.as_bytes()).await?;
203 self.io.write_all(val).await?;
204 self.io.write_all(b"\r\n").await?;
205 self.io.flush().await?;
206 Ok(())
207 }
208
209 pub async fn append<K: Display>(&mut self, key: K, val: &[u8]) -> Result<(), Error> {
211 let header = format!("append {} 0 0 {} noreply\r\n", key, val.len());
212 self.io.write_all(header.as_bytes()).await?;
213 self.io.write_all(val).await?;
214 self.io.write_all(b"\r\n").await?;
215 self.io.flush().await?;
216 Ok(())
217 }
218
219 pub async fn delete<K: Display>(&mut self, key: K) -> Result<(), Error> {
221 let header = format!("delete {} noreply\r\n", key);
222 self.io.write_all(header.as_bytes()).await?;
223 self.io.flush().await?;
224 Ok(())
225 }
226
227 pub async fn version(&mut self) -> Result<String, Error> {
229 self.io.write_all(b"version\r\n").await?;
230 self.io.flush().await?;
231
232 let header = {
234 let buf = self.read_line().await?;
235 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
236 };
237
238 if !header.starts_with("VERSION") {
239 return Err(Error::new(ErrorKind::Other, header));
240 }
241 let version = header.trim_start_matches("VERSION ").trim_end();
242 Ok(version.to_string())
243 }
244
245 pub async fn flush(&mut self) -> Result<(), Error> {
247 self.io.write_all(b"flush_all\r\n").await?;
248 self.io.flush().await?;
249
250 let header = {
252 let buf = self.read_line().await?;
253 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
254 };
255
256 if header == "OK\r\n" {
257 Ok(())
258 } else {
259 Err(Error::new(ErrorKind::Other, header))
260 }
261 }
262
263 pub async fn increment<K: AsRef<[u8]>>(&mut self, key: K, amount: u64) -> Result<u64, Error> {
268 let writer = self.io.get_mut();
270 let buf = &[
271 b"incr ",
272 key.as_ref(),
273 b" ",
274 amount.to_string().as_bytes(),
275 b"\r\n",
276 ]
277 .concat();
278 writer.write_all(buf).await?;
279 writer.flush().await?;
280
281 let header = {
283 let buf = self.read_line().await?;
284 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
285 };
286
287 if header == "NOT_FOUND\r\n" {
288 Err(ErrorKind::NotFound.into())
289 } else if is_memcached_error(header) {
290 Err(Error::new(ErrorKind::Other, header))
291 } else {
292 let value = header
293 .trim_end()
294 .parse::<u64>()
295 .map_err(|_| Error::from(ErrorKind::InvalidData))?;
296 Ok(value)
297 }
298 }
299
300 pub async fn increment_noreply<K: AsRef<[u8]>>(
302 &mut self,
303 key: K,
304 amount: u64,
305 ) -> Result<(), Error> {
306 let writer = self.io.get_mut();
307 let buf = &[
308 b"incr ",
309 key.as_ref(),
310 b" ",
311 amount.to_string().as_bytes(),
312 b" noreply\r\n",
313 ]
314 .concat();
315 writer.write_all(buf).await?;
316 writer.flush().await?;
317 Ok(())
318 }
319
320 pub async fn decrement<K: AsRef<[u8]>>(&mut self, key: K, amount: u64) -> Result<u64, Error> {
325 let writer = self.io.get_mut();
327 let buf = &[
328 b"decr ",
329 key.as_ref(),
330 b" ",
331 amount.to_string().as_bytes(),
332 b"\r\n",
333 ]
334 .concat();
335 writer.write_all(buf).await?;
336 writer.flush().await?;
337
338 let header = {
340 let buf = self.read_line().await?;
341 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
342 };
343
344 if header == "NOT_FOUND\r\n" {
345 Err(ErrorKind::NotFound.into())
346 } else if is_memcached_error(header) {
347 Err(Error::new(ErrorKind::Other, header))
348 } else {
349 let value = header
350 .trim_end()
351 .parse::<u64>()
352 .map_err(|_| Error::from(ErrorKind::InvalidData))?;
353 Ok(value)
354 }
355 }
356
357 async fn read_line(&mut self) -> Result<&[u8], Error> {
358 let Self { io, buf } = self;
359 buf.clear();
360 io.read_until(b'\n', buf).await?;
361 if buf.last().copied() != Some(b'\n') {
362 return Err(ErrorKind::UnexpectedEof.into());
363 }
364 Ok(&buf[..])
365 }
366
367 pub async fn gets_cas<K: AsRef<[u8]>>(&mut self, key: K) -> Result<(Vec<u8>, u64), Error> {
369 let writer = self.io.get_mut();
371 writer
372 .write_all(&[b"gets ", key.as_ref(), b"\r\n"].concat())
373 .await?;
374 writer.flush().await?;
375
376 let (val, maybe_cas) = self.read_get_response().await?;
377 let cas = maybe_cas.ok_or(ErrorKind::InvalidData)?;
378
379 Ok((val, cas))
380 }
381
382 pub async fn cas<K: Display>(
387 &mut self,
388 key: K,
389 val: &[u8],
390 cas_id: u64,
391 expiration: u32,
392 ) -> Result<bool, Error> {
393 let header = format!("cas {} 0 {} {} {}\r\n", key, expiration, val.len(), cas_id);
394 self.io.write_all(header.as_bytes()).await?;
395 self.io.write_all(val).await?;
396 self.io.write_all(b"\r\n").await?;
397 self.io.flush().await?;
398
399 let header = {
401 let buf = self.read_line().await?;
402 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
403 };
404
405 if header.starts_with("STORED") {
423 Ok(true)
424 } else if header.starts_with("EXISTS") || header.starts_with("NOT_STORED") {
425 Ok(false)
426 } else if header.starts_with("NOT FOUND") {
427 Err(ErrorKind::NotFound.into())
428 } else {
429 Err(Error::new(ErrorKind::Other, header))
430 }
431 }
432
433 pub async fn append_or_vivify<K: Display>(
435 &mut self,
436 key: K,
437 val: &[u8],
438 ttl: u32,
439 ) -> Result<(), Error> {
440 let header = format!("ms {} {} MA N{}\r\n", key, val.len(), ttl);
453 self.io.write_all(header.as_bytes()).await?;
454 self.io.write_all(val).await?;
455 self.io.write_all(b"\r\n").await?;
456 self.io.flush().await?;
457
458 let header = {
460 let buf = self.read_line().await?;
461 std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
462 };
463
464 if header.starts_with("HD") {
485 Ok(())
486 } else {
487 Err(Error::new(ErrorKind::Other, header))
488 }
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use futures::executor::block_on;
495 use futures::io::{AsyncRead, AsyncWrite};
496
497 use std::io::{Cursor, Error, ErrorKind, Read, Write};
498 use std::pin::Pin;
499 use std::task::{Context, Poll};
500
501 struct Cache {
502 r: Cursor<Vec<u8>>,
503 w: Cursor<Vec<u8>>,
504 }
505
506 impl Cache {
507 fn new() -> Self {
508 Cache {
509 r: Cursor::new(Vec::new()),
510 w: Cursor::new(Vec::new()),
511 }
512 }
513 }
514
515 impl AsyncRead for Cache {
516 fn poll_read(
517 self: Pin<&mut Self>,
518 _cx: &mut Context,
519 buf: &mut [u8],
520 ) -> Poll<Result<usize, Error>> {
521 Poll::Ready(self.get_mut().r.read(buf))
522 }
523 }
524
525 impl AsyncWrite for Cache {
526 fn poll_write(
527 self: Pin<&mut Self>,
528 _cx: &mut Context,
529 buf: &[u8],
530 ) -> Poll<Result<usize, Error>> {
531 Poll::Ready(self.get_mut().w.write(buf))
532 }
533
534 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
535 Poll::Ready(self.get_mut().w.flush())
536 }
537
538 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
539 Poll::Ready(Ok(()))
540 }
541 }
542
543 #[test]
544 fn test_ascii_get() {
545 let mut cache = Cache::new();
546 cache
547 .r
548 .get_mut()
549 .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\n");
550 let mut ascii = super::Protocol::new(&mut cache);
551 assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
552 assert_eq!(cache.w.get_ref(), b"get foo\r\n");
553 }
554
555 #[test]
556 fn test_ascii_get2() {
557 let mut cache = Cache::new();
558 cache
559 .r
560 .get_mut()
561 .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\nVALUE bar 0 3\r\nbaz\r\nEND\r\n");
562 let mut ascii = super::Protocol::new(&mut cache);
563 assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
564 assert_eq!(block_on(ascii.get(&"bar")).unwrap(), b"baz");
565 }
566
567 #[test]
568 fn test_ascii_get_cas() {
569 let mut cache = Cache::new();
570 cache
571 .r
572 .get_mut()
573 .extend_from_slice(b"VALUE foo 0 3 99999\r\nbar\r\nEND\r\n");
574 let mut ascii = super::Protocol::new(&mut cache);
575 assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
576 assert_eq!(cache.w.get_ref(), b"get foo\r\n");
577 }
578
579 #[test]
580 fn test_ascii_get_empty() {
581 let mut cache = Cache::new();
582 cache.r.get_mut().extend_from_slice(b"END\r\n");
583 let mut ascii = super::Protocol::new(&mut cache);
584 assert_eq!(
585 block_on(ascii.get(&"foo")).unwrap_err().kind(),
586 ErrorKind::NotFound
587 );
588 assert_eq!(cache.w.get_ref(), b"get foo\r\n");
589 }
590
591 #[test]
592 fn test_ascii_get_eof_error() {
593 let mut cache = Cache::new();
594 cache.r.get_mut().extend_from_slice(b"EN");
595 let mut ascii = super::Protocol::new(&mut cache);
596 assert_eq!(
597 block_on(ascii.get(&"foo")).unwrap_err().kind(),
598 ErrorKind::UnexpectedEof
599 );
600 assert_eq!(cache.w.get_ref(), b"get foo\r\n");
601 }
602
603 #[test]
604 fn test_ascii_get_one() {
605 let mut cache = Cache::new();
606 cache
607 .r
608 .get_mut()
609 .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\n");
610 let mut ascii = super::Protocol::new(&mut cache);
611 let keys = vec!["foo"];
612 let map = block_on(ascii.get_multi(&keys)).unwrap();
613 assert_eq!(map.len(), 1);
614 assert_eq!(map.get("foo").unwrap(), b"bar");
615 assert_eq!(cache.w.get_ref(), b"get foo\r\n");
616 }
617
618 #[test]
619 fn test_ascii_get_many() {
620 let mut cache = Cache::new();
621 cache
622 .r
623 .get_mut()
624 .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nVALUE baz 44 4\r\ncrux\r\nEND\r\n");
625 let mut ascii = super::Protocol::new(&mut cache);
626 let keys = vec!["foo", "baz", "blah"];
627 let map = block_on(ascii.get_multi(&keys)).unwrap();
628 assert_eq!(map.len(), 2);
629 assert_eq!(map.get("foo").unwrap(), b"bar");
630 assert_eq!(map.get("baz").unwrap(), b"crux");
631 assert_eq!(cache.w.get_ref(), b"get foo baz blah\r\n");
632 }
633
634 #[test]
635 fn test_ascii_get_prefix() {
636 let mut cache = Cache::new();
637 cache
638 .r
639 .get_mut()
640 .extend_from_slice(b"VALUE key 0 3\r\nbar\r\nVALUE kez 44 4\r\ncrux\r\nEND\r\n");
641 let mut ascii = super::Protocol::new(&mut cache);
642 let key_prefix = "ke";
643 let map = block_on(ascii.get_prefix(&key_prefix, None)).unwrap();
644 assert_eq!(map.len(), 2);
645 assert_eq!(map.get("key").unwrap(), b"bar");
646 assert_eq!(map.get("kez").unwrap(), b"crux");
647 assert_eq!(cache.w.get_ref(), b"get_prefix ke\r\n");
648 }
649
650 #[test]
651 fn test_ascii_get_multi_empty() {
652 let mut cache = Cache::new();
653 cache.r.get_mut().extend_from_slice(b"END\r\n");
654 let mut ascii = super::Protocol::new(&mut cache);
655 let keys = vec!["foo", "baz"];
656 let map = block_on(ascii.get_multi(&keys)).unwrap();
657 assert!(map.is_empty());
658 assert_eq!(cache.w.get_ref(), b"get foo baz\r\n");
659 }
660
661 #[test]
662 fn test_ascii_get_multi_zero_keys() {
663 let mut cache = Cache::new();
664 cache.r.get_mut().extend_from_slice(b"END\r\n");
665 let mut ascii = super::Protocol::new(&mut cache);
666 let map = block_on(ascii.get_multi::<&str>(&[])).unwrap();
667 assert!(map.is_empty());
668 assert_eq!(cache.w.get_ref(), b"");
669 }
670
671 #[test]
672 fn test_ascii_set() {
673 let (key, val, ttl) = ("foo", "bar", 5);
674 let mut cache = Cache::new();
675 let mut ascii = super::Protocol::new(&mut cache);
676 block_on(ascii.set(&key, val.as_bytes(), ttl)).unwrap();
677 assert_eq!(
678 cache.w.get_ref(),
679 &format!("set {} 0 {} {} noreply\r\n{}\r\n", key, ttl, val.len(), val)
680 .as_bytes()
681 .to_vec()
682 );
683 }
684
685 #[test]
686 fn test_ascii_increment_noreply() {
687 let (key, amount) = ("foo", 5);
688 let mut cache = Cache::new();
689 let mut ascii = super::Protocol::new(&mut cache);
690 block_on(ascii.increment_noreply(key.as_bytes(), amount)).unwrap();
691 assert_eq!(
692 cache.w.get_ref(),
693 &format!("incr {} {} noreply\r\n", key, amount)
694 .as_bytes()
695 .to_vec()
696 );
697 }
698
699 #[test]
700 fn test_ascii_add_new_key() {
701 let (key, val, ttl) = ("foo", "bar", 5);
702 let mut cache = Cache::new();
703 cache.r.get_mut().extend_from_slice(b"STORED\r\n");
704 let mut ascii = super::Protocol::new(&mut cache);
705 block_on(ascii.add(&key, val.as_bytes(), ttl)).unwrap();
706 assert_eq!(
707 cache.w.get_ref(),
708 &format!("add {} 0 {} {}\r\n{}\r\n", key, ttl, val.len(), val)
709 .as_bytes()
710 .to_vec()
711 );
712 }
713
714 #[test]
715 fn test_ascii_add_duplicate() {
716 let (key, val, ttl) = ("foo", "bar", 5);
717 let mut cache = Cache::new();
718 cache.r.get_mut().extend_from_slice(b"NOT_STORED\r\n");
719 let mut ascii = super::Protocol::new(&mut cache);
720 assert_eq!(
721 block_on(ascii.add(&key, val.as_bytes(), ttl))
722 .unwrap_err()
723 .kind(),
724 ErrorKind::AlreadyExists
725 );
726 }
727
728 #[test]
729 fn test_ascii_version() {
730 let mut cache = Cache::new();
731 cache.r.get_mut().extend_from_slice(b"VERSION 1.6.6\r\n");
732 let mut ascii = super::Protocol::new(&mut cache);
733 assert_eq!(block_on(ascii.version()).unwrap(), "1.6.6");
734 assert_eq!(cache.w.get_ref(), b"version\r\n");
735 }
736
737 #[test]
738 fn test_ascii_flush() {
739 let mut cache = Cache::new();
740 cache.r.get_mut().extend_from_slice(b"OK\r\n");
741 let mut ascii = super::Protocol::new(&mut cache);
742 assert!(block_on(ascii.flush()).is_ok());
743 assert_eq!(cache.w.get_ref(), b"flush_all\r\n");
744 }
745
746 #[test]
747 fn test_ascii_increment() {
748 let mut cache = Cache::new();
749 cache.r.get_mut().extend_from_slice(b"2\r\n");
750 let mut ascii = super::Protocol::new(&mut cache);
751 assert_eq!(block_on(ascii.increment("foo", 1)).unwrap(), 2);
752 assert_eq!(cache.w.get_ref(), b"incr foo 1\r\n");
753 }
754
755 #[test]
756 fn test_ascii_decrement() {
757 let mut cache = Cache::new();
758 cache.r.get_mut().extend_from_slice(b"0\r\n");
759 let mut ascii = super::Protocol::new(&mut cache);
760 assert_eq!(block_on(ascii.decrement("foo", 1)).unwrap(), 0);
761 assert_eq!(cache.w.get_ref(), b"decr foo 1\r\n");
762 }
763
764 #[test]
765 fn test_ascii_increment_not_found() {
766 let mut cache = Cache::new();
767 cache.r.get_mut().extend_from_slice(b"NOT_FOUND\r\n");
768 let mut ascii = super::Protocol::new(&mut cache);
769 let err = block_on(ascii.increment("foo", 1)).unwrap_err();
770 assert_eq!(err.kind(), ErrorKind::NotFound);
771 }
772
773 #[test]
774 fn test_ascii_increment_client_error() {
775 let mut cache = Cache::new();
776 cache
777 .r
778 .get_mut()
779 .extend_from_slice(b"CLIENT_ERROR cannot increment or decrement non-numeric value\r\n");
780 let mut ascii = super::Protocol::new(&mut cache);
781 let err = block_on(ascii.increment("foo", 1)).unwrap_err();
782 assert_eq!(err.kind(), ErrorKind::Other);
783 assert!(err
784 .to_string()
785 .contains("cannot increment or decrement non-numeric value"));
786 }
787
788 #[test]
789 fn test_ascii_increment_server_error() {
790 let mut cache = Cache::new();
791 cache
792 .r
793 .get_mut()
794 .extend_from_slice(b"SERVER_ERROR out of memory\r\n");
795 let mut ascii = super::Protocol::new(&mut cache);
796 let err = block_on(ascii.increment("foo", 1)).unwrap_err();
797 assert_eq!(err.kind(), ErrorKind::Other);
798 assert!(err.to_string().contains("out of memory"));
799 }
800
801 #[test]
802 fn test_ascii_increment_error() {
803 let mut cache = Cache::new();
804 cache.r.get_mut().extend_from_slice(b"ERROR\r\n");
805 let mut ascii = super::Protocol::new(&mut cache);
806 let err = block_on(ascii.increment("foo", 1)).unwrap_err();
807 assert_eq!(err.kind(), ErrorKind::Other);
808 }
809
810 #[test]
811 fn test_ascii_decrement_not_found() {
812 let mut cache = Cache::new();
813 cache.r.get_mut().extend_from_slice(b"NOT_FOUND\r\n");
814 let mut ascii = super::Protocol::new(&mut cache);
815 let err = block_on(ascii.decrement("foo", 1)).unwrap_err();
816 assert_eq!(err.kind(), ErrorKind::NotFound);
817 }
818
819 #[test]
820 fn test_ascii_decrement_client_error() {
821 let mut cache = Cache::new();
822 cache
823 .r
824 .get_mut()
825 .extend_from_slice(b"CLIENT_ERROR cannot increment or decrement non-numeric value\r\n");
826 let mut ascii = super::Protocol::new(&mut cache);
827 let err = block_on(ascii.decrement("foo", 1)).unwrap_err();
828 assert_eq!(err.kind(), ErrorKind::Other);
829 assert!(err
830 .to_string()
831 .contains("cannot increment or decrement non-numeric value"));
832 }
833
834 #[test]
835 fn test_ascii_gets_cas() {
836 let mut cache = Cache::new();
837 cache
838 .r
839 .get_mut()
840 .extend_from_slice(b"VALUE foo 0 3 77\r\nbar\r\nEND\r\n");
841 let mut ascii = super::Protocol::new(&mut cache);
842 assert_eq!(
843 block_on(ascii.gets_cas(&"foo")).unwrap(),
844 (Vec::from(b"bar"), 77)
845 );
846 assert_eq!(cache.w.get_ref(), b"gets foo\r\n");
847 }
848
849 #[test]
850 fn test_ascii_cas() {
851 let (key, val, cas_id, ttl) = ("foo", "bar", 33, 5);
852 let mut cache = Cache::new();
853 cache
854 .r
855 .get_mut()
856 .extend_from_slice(b"STORED\r\nbar\r\nEND\r\n");
857 let mut ascii = super::Protocol::new(&mut cache);
858 block_on(ascii.cas(&key, val.as_bytes(), cas_id, ttl)).unwrap();
859 assert_eq!(
860 cache.w.get_ref(),
861 &format!(
862 "cas {} 0 {} {} {}\r\n{}\r\n",
863 key,
864 ttl,
865 val.len(),
866 cas_id,
867 val
868 )
869 .as_bytes()
870 .to_vec()
871 );
872 }
873
874 #[test]
875 fn test_ascii_append_or_vivify() {
876 let (key, val, ttl) = ("foo", "bar", 5);
877 let mut cache = Cache::new();
878 cache.r.get_mut().extend_from_slice(b"HD\r\nbar\r\nEND\r\n");
879 let mut ascii = super::Protocol::new(&mut cache);
880 block_on(ascii.append_or_vivify(&key, val.as_bytes(), ttl)).unwrap();
881 assert_eq!(
882 cache.w.get_ref(),
883 &format!("ms {} {} MA N{}\r\n{}\r\n", key, val.len(), ttl, val)
884 .as_bytes()
885 .to_vec()
886 );
887 }
888
889 #[test]
890 fn test_response_starting_with_error() {
891 let mut cache = Cache::new();
892 cache.r.get_mut().extend_from_slice(b"ERROR\r\n");
893 let mut ascii = super::Protocol::new(&mut cache);
894 let err = block_on(ascii.get(&"foo")).unwrap_err();
895 assert_eq!(err.kind(), ErrorKind::Other);
896 }
897
898 #[test]
899 fn test_response_containing_error_string() {
900 let mut cache = Cache::new();
901 cache
902 .r
903 .get_mut()
904 .extend_from_slice(b"VALUE contains_ERROR 0 3\r\nbar\r\nEND\r\n");
905 let mut ascii = super::Protocol::new(&mut cache);
906 assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
907 }
908
909 #[test]
910 fn test_client_error_response() {
911 let mut cache = Cache::new();
912 cache
913 .r
914 .get_mut()
915 .extend_from_slice(b"CLIENT_ERROR bad command\r\n");
916 let mut ascii = super::Protocol::new(&mut cache);
917 let err = block_on(ascii.get(&"foo")).unwrap_err();
918 assert_eq!(err.kind(), ErrorKind::Other);
919 }
920
921 #[test]
922 fn test_server_error_response() {
923 let mut cache = Cache::new();
924 cache
925 .r
926 .get_mut()
927 .extend_from_slice(b"SERVER_ERROR out of memory\r\n");
928 let mut ascii = super::Protocol::new(&mut cache);
929 let err = block_on(ascii.get(&"foo")).unwrap_err();
930 assert_eq!(err.kind(), ErrorKind::Other);
931 }
932}