memcache_async/
ascii.rs

1//! This is a simplified implementation of [rust-memcache](https://github.com/aisk/rust-memcache)
2//! ported for AsyncRead + AsyncWrite.
3use core::fmt::Display;
4#[cfg(feature = "with-futures")]
5use futures::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
6
7#[cfg(feature = "with-tokio")]
8use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
9
10use std::collections::HashMap;
11use std::io::{Error, ErrorKind};
12use std::marker::Unpin;
13
14/// Memcache ASCII protocol implementation.
15pub struct Protocol<S> {
16    io: BufReader<S>,
17    buf: Vec<u8>,
18}
19
20impl<S> Protocol<S>
21where
22    S: AsyncRead + AsyncWrite + Unpin,
23{
24    /// Creates the ASCII protocol on a stream.
25    pub fn new(io: S) -> Self {
26        Self {
27            io: BufReader::new(io),
28            buf: Vec::new(),
29        }
30    }
31
32    /// Returns the value for given key as bytes. If the value doesn't exist, [`ErrorKind::NotFound`] is returned.
33    pub async fn get<K: AsRef<[u8]>>(&mut self, key: K) -> Result<Vec<u8>, Error> {
34        // Send command
35        let writer = self.io.get_mut();
36        writer
37            .write_all(&[b"get ", key.as_ref(), b"\r\n"].concat())
38            .await?;
39        writer.flush().await?;
40
41        let (val, _) = self.read_get_response().await?;
42        Ok(val)
43    }
44
45    async fn read_get_response(&mut self) -> Result<(Vec<u8>, Option<u64>), Error> {
46        // Read response header
47        let header = self.read_line().await?;
48        let header = std::str::from_utf8(header).map_err(|_| ErrorKind::InvalidData)?;
49
50        // Check response header and parse value length
51        if header.starts_with("ERROR")
52            || header.starts_with("CLIENT_ERROR")
53            || header.starts_with("SERVER_ERROR")
54        {
55            return Err(Error::new(ErrorKind::Other, header));
56        } else if header.starts_with("END") {
57            return Err(ErrorKind::NotFound.into());
58        }
59
60        // VALUE <key> <flags> <bytes> [<cas unique>]\r\n
61        let mut parts = header.split(' ');
62        let length: usize = parts
63            .nth(3)
64            .and_then(|len| len.trim_end().parse().ok())
65            .ok_or(ErrorKind::InvalidData)?;
66
67        // cas is present only if gets is called
68        let cas: Option<u64> = parts.next().and_then(|len| len.trim_end().parse().ok());
69
70        // Read value
71        let mut buffer: Vec<u8> = vec![0; length];
72        self.io.read_exact(&mut buffer).await?;
73
74        // Read the trailing header
75        self.read_line().await?; // \r\n
76        self.read_line().await?; // END\r\n
77
78        Ok((buffer, cas))
79    }
80
81    /// Returns values for multiple keys in a single call as a [`HashMap`] from keys to found values.
82    /// If a key is not present in memcached it will be absent from returned map.
83    pub async fn get_multi<K: AsRef<[u8]>>(
84        &mut self,
85        keys: &[K],
86    ) -> Result<HashMap<String, Vec<u8>>, Error> {
87        if keys.is_empty() {
88            return Ok(HashMap::new());
89        }
90
91        // Send command
92        let writer = self.io.get_mut();
93        writer.write_all("get".as_bytes()).await?;
94        for k in keys {
95            writer.write_all(b" ").await?;
96            writer.write_all(k.as_ref()).await?;
97        }
98        writer.write_all(b"\r\n").await?;
99        writer.flush().await?;
100
101        // Read response header
102        self.read_many_values().await
103    }
104
105    async fn read_many_values(&mut self) -> Result<HashMap<String, Vec<u8>>, Error> {
106        let mut map = HashMap::new();
107        loop {
108            let header = {
109                let buf = self.read_line().await?;
110                std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
111            }
112            .to_string();
113            let mut parts = header.split(' ');
114            match parts.next() {
115                Some("VALUE") => {
116                    if let (Some(key), _flags, Some(size_str)) =
117                        (parts.next(), parts.next(), parts.next())
118                    {
119                        let size: usize = size_str
120                            .trim_end()
121                            .parse()
122                            .map_err(|_| Error::from(ErrorKind::InvalidData))?;
123                        let mut buffer: Vec<u8> = vec![0; size];
124                        self.io.read_exact(&mut buffer).await?;
125                        let mut crlf = vec![0; 2];
126                        self.io.read_exact(&mut crlf).await?;
127
128                        map.insert(key.to_owned(), buffer);
129                    } else {
130                        return Err(Error::new(ErrorKind::InvalidData, header));
131                    }
132                }
133                Some("END\r\n") => return Ok(map),
134                Some("ERROR") => return Err(Error::new(ErrorKind::Other, header)),
135                _ => return Err(Error::new(ErrorKind::InvalidData, header)),
136            }
137        }
138    }
139
140    /// Get up to `limit` keys which match the given prefix. Returns a [HashMap] from keys to found values.
141    /// This is not part of the Memcached standard, but some servers implement it nonetheless.
142    pub async fn get_prefix<K: Display>(
143        &mut self,
144        key_prefix: K,
145        limit: Option<usize>,
146    ) -> Result<HashMap<String, Vec<u8>>, Error> {
147        // Send command
148        let header = if let Some(limit) = limit {
149            format!("get_prefix {} {}\r\n", key_prefix, limit)
150        } else {
151            format!("get_prefix {}\r\n", key_prefix)
152        };
153        self.io.write_all(header.as_bytes()).await?;
154        self.io.flush().await?;
155
156        // Read response header
157        self.read_many_values().await
158    }
159
160    /// Add a key. If the value exists, [`ErrorKind::AlreadyExists`] is returned.
161    pub async fn add<K: Display>(
162        &mut self,
163        key: K,
164        val: &[u8],
165        expiration: u32,
166    ) -> Result<(), Error> {
167        // Send command
168        let header = format!("add {} 0 {} {}\r\n", key, expiration, val.len());
169        self.io.write_all(header.as_bytes()).await?;
170        self.io.write_all(val).await?;
171        self.io.write_all(b"\r\n").await?;
172        self.io.flush().await?;
173
174        // Read response header
175        let header = {
176            let buf = self.read_line().await?;
177            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
178        };
179
180        // Check response header and parse value length
181        if header.contains("ERROR") {
182            return Err(Error::new(ErrorKind::Other, header));
183        } else if header.starts_with("NOT_STORED") {
184            return Err(ErrorKind::AlreadyExists.into());
185        }
186
187        Ok(())
188    }
189
190    /// Set key to given value and don't wait for response.
191    pub async fn set<K: Display>(
192        &mut self,
193        key: K,
194        val: &[u8],
195        expiration: u32,
196    ) -> Result<(), Error> {
197        let header = format!("set {} 0 {} {} noreply\r\n", key, expiration, val.len());
198        self.io.write_all(header.as_bytes()).await?;
199        self.io.write_all(val).await?;
200        self.io.write_all(b"\r\n").await?;
201        self.io.flush().await?;
202        Ok(())
203    }
204
205    /// Append bytes to the value in memcached and don't wait for response.
206    pub async fn append<K: Display>(&mut self, key: K, val: &[u8]) -> Result<(), Error> {
207        let header = format!("append {} 0 0 {} noreply\r\n", key, val.len());
208        self.io.write_all(header.as_bytes()).await?;
209        self.io.write_all(val).await?;
210        self.io.write_all(b"\r\n").await?;
211        self.io.flush().await?;
212        Ok(())
213    }
214
215    /// Delete a key and don't wait for response.
216    pub async fn delete<K: Display>(&mut self, key: K) -> Result<(), Error> {
217        let header = format!("delete {} noreply\r\n", key);
218        self.io.write_all(header.as_bytes()).await?;
219        self.io.flush().await?;
220        Ok(())
221    }
222
223    /// Return the version of the remote server.
224    pub async fn version(&mut self) -> Result<String, Error> {
225        self.io.write_all(b"version\r\n").await?;
226        self.io.flush().await?;
227
228        // Read response header
229        let header = {
230            let buf = self.read_line().await?;
231            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
232        };
233
234        if !header.starts_with("VERSION") {
235            return Err(Error::new(ErrorKind::Other, header));
236        }
237        let version = header.trim_start_matches("VERSION ").trim_end();
238        Ok(version.to_string())
239    }
240
241    /// Delete all keys from the cache.
242    pub async fn flush(&mut self) -> Result<(), Error> {
243        self.io.write_all(b"flush_all\r\n").await?;
244        self.io.flush().await?;
245
246        // Read response header
247        let header = {
248            let buf = self.read_line().await?;
249            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
250        };
251
252        if header == "OK\r\n" {
253            Ok(())
254        } else {
255            Err(Error::new(ErrorKind::Other, header))
256        }
257    }
258
259    /// Increment a specific integer stored with a key by a given value. If the value doesn't exist, [`ErrorKind::NotFound`] is returned.
260    /// Otherwise the new value is returned
261    pub async fn increment<K: AsRef<[u8]>>(&mut self, key: K, amount: u64) -> Result<u64, Error> {
262        // Send command
263        let writer = self.io.get_mut();
264        let buf = &[
265            b"incr ",
266            key.as_ref(),
267            b" ",
268            amount.to_string().as_bytes(),
269            b"\r\n",
270        ]
271        .concat();
272        writer.write_all(buf).await?;
273        writer.flush().await?;
274
275        // Read response header
276        let header = {
277            let buf = self.read_line().await?;
278            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
279        };
280
281        if header == "NOT_FOUND\r\n" {
282            Err(ErrorKind::NotFound.into())
283        } else {
284            let value = header
285                .trim_end()
286                .parse::<u64>()
287                .map_err(|_| Error::from(ErrorKind::InvalidData))?;
288            Ok(value)
289        }
290    }
291
292    /// Decrement a specific integer stored with a key by a given value. If the value doesn't exist, [`ErrorKind::NotFound`] is returned.
293    /// Otherwise the new value is returned
294    pub async fn decrement<K: AsRef<[u8]>>(&mut self, key: K, amount: u64) -> Result<u64, Error> {
295        // Send command
296        let writer = self.io.get_mut();
297        let buf = &[
298            b"decr ",
299            key.as_ref(),
300            b" ",
301            amount.to_string().as_bytes(),
302            b"\r\n",
303        ]
304        .concat();
305        writer.write_all(buf).await?;
306        writer.flush().await?;
307
308        // Read response header
309        let header = {
310            let buf = self.read_line().await?;
311            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
312        };
313
314        if header == "NOT_FOUND\r\n" {
315            Err(ErrorKind::NotFound.into())
316        } else {
317            let value = header
318                .trim_end()
319                .parse::<u64>()
320                .map_err(|_| Error::from(ErrorKind::InvalidData))?;
321            Ok(value)
322        }
323    }
324
325    async fn read_line(&mut self) -> Result<&[u8], Error> {
326        let Self { io, buf } = self;
327        buf.clear();
328        io.read_until(b'\n', buf).await?;
329        if buf.last().copied() != Some(b'\n') {
330            return Err(ErrorKind::UnexpectedEof.into());
331        }
332        Ok(&buf[..])
333    }
334
335    /// Call gets to also return CAS id, which can be used to run a second CAS command
336    pub async fn gets_cas<K: AsRef<[u8]>>(&mut self, key: K) -> Result<(Vec<u8>, u64), Error> {
337        // Send command
338        let writer = self.io.get_mut();
339        writer
340            .write_all(&[b"gets ", key.as_ref(), b"\r\n"].concat())
341            .await?;
342        writer.flush().await?;
343
344        let (val, maybe_cas) = self.read_get_response().await?;
345        let cas = maybe_cas.ok_or(ErrorKind::InvalidData)?;
346
347        Ok((val, cas))
348    }
349
350    // CAS: compare and swap a value. the value of `cas` can be obtained by first making a gets_cas
351    // call
352    // returns true/false to indicate the cas operation succeeded or failed
353    // returns an error for all other failures
354    pub async fn cas<K: Display>(
355        &mut self,
356        key: K,
357        val: &[u8],
358        cas_id: u64,
359        expiration: u32,
360    ) -> Result<bool, Error> {
361        let header = format!("cas {} 0 {} {} {}\r\n", key, expiration, val.len(), cas_id);
362        self.io.write_all(header.as_bytes()).await?;
363        self.io.write_all(val).await?;
364        self.io.write_all(b"\r\n").await?;
365        self.io.flush().await?;
366
367        // Read response header
368        let header = {
369            let buf = self.read_line().await?;
370            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
371        };
372
373        /* From memcached docs:
374         *    After sending the command line and the data block the client awaits
375         *    the reply, which may be:
376         *
377         *    - "STORED\r\n", to indicate success.
378         *
379         *    - "NOT_STORED\r\n" to indicate the data was not stored, but not
380         *    because of an error. This normally means that the
381         *    condition for an "add" or a "replace" command wasn't met.
382         *
383         *    - "EXISTS\r\n" to indicate that the item you are trying to store with
384         *    a "cas" command has been modified since you last fetched it.
385         *
386         *    - "NOT_FOUND\r\n" to indicate that the item you are trying to store
387         *    with a "cas" command did not exist.
388         */
389
390        if header.starts_with("STORED") {
391            Ok(true)
392        } else if header.starts_with("EXISTS") || header.starts_with("NOT_STORED") {
393            Ok(false)
394        } else if header.starts_with("NOT FOUND") {
395            Err(ErrorKind::NotFound.into())
396        } else {
397            Err(Error::new(ErrorKind::Other, header))
398        }
399    }
400
401    /// Append bytes to the value in memcached, and creates the key if it is missing instead of failing compared to simple append
402    pub async fn append_or_vivify<K: Display>(
403        &mut self,
404        key: K,
405        val: &[u8],
406        ttl: u32,
407    ) -> Result<(), Error> {
408        /* From memcached docs:
409         * - M(token): mode switch to change behavior to add, replace, append, prepend. Takes a single character for the mode.
410         *       A: "append" command. If item exists, append the new value to its data.
411         * ----
412         * The "cas" command is supplanted by specifying the cas value with the 'C' flag.
413         * Append and Prepend modes will also respect a supplied cas value.
414         *
415         * - N(token): if in append mode, autovivify on miss with supplied TTL
416         *
417         * If N is supplied, and append reaches a miss, it will
418         * create a new item seeded with the data from the append command.
419         */
420        let header = format!("ms {} {} MA N{}\r\n", key, val.len(), ttl);
421        self.io.write_all(header.as_bytes()).await?;
422        self.io.write_all(val).await?;
423        self.io.write_all(b"\r\n").await?;
424        self.io.flush().await?;
425
426        // Read response header
427        let header = {
428            let buf = self.read_line().await?;
429            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
430        };
431
432        /* From memcached docs:
433         *    After sending the command line and the data block the client awaits
434         *    the reply, which is of the format:
435         *
436         *    <CD> <flags>*\r\n
437         *
438         *    Where CD is one of:
439         *
440         *    - "HD" (STORED), to indicate success.
441         *
442         *    - "NS" (NOT_STORED), to indicate the data was not stored, but not
443         *    because of an error.
444         *
445         *    - "EX" (EXISTS), to indicate that the item you are trying to store with
446         *    CAS semantics has been modified since you last fetched it.
447         *
448         *    - "NF" (NOT_FOUND), to indicate that the item you are trying to store
449         *    with CAS semantics did not exist.
450         */
451
452        if header.starts_with("HD") {
453            Ok(())
454        } else {
455            Err(Error::new(ErrorKind::Other, header))
456        }
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use futures::executor::block_on;
463    use futures::io::{AsyncRead, AsyncWrite};
464
465    use std::io::{Cursor, Error, ErrorKind, Read, Write};
466    use std::pin::Pin;
467    use std::task::{Context, Poll};
468
469    struct Cache {
470        r: Cursor<Vec<u8>>,
471        w: Cursor<Vec<u8>>,
472    }
473
474    impl Cache {
475        fn new() -> Self {
476            Cache {
477                r: Cursor::new(Vec::new()),
478                w: Cursor::new(Vec::new()),
479            }
480        }
481    }
482
483    impl AsyncRead for Cache {
484        fn poll_read(
485            self: Pin<&mut Self>,
486            _cx: &mut Context,
487            buf: &mut [u8],
488        ) -> Poll<Result<usize, Error>> {
489            Poll::Ready(self.get_mut().r.read(buf))
490        }
491    }
492
493    impl AsyncWrite for Cache {
494        fn poll_write(
495            self: Pin<&mut Self>,
496            _cx: &mut Context,
497            buf: &[u8],
498        ) -> Poll<Result<usize, Error>> {
499            Poll::Ready(self.get_mut().w.write(buf))
500        }
501
502        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
503            Poll::Ready(self.get_mut().w.flush())
504        }
505
506        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
507            Poll::Ready(Ok(()))
508        }
509    }
510
511    #[test]
512    fn test_ascii_get() {
513        let mut cache = Cache::new();
514        cache
515            .r
516            .get_mut()
517            .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\n");
518        let mut ascii = super::Protocol::new(&mut cache);
519        assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
520        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
521    }
522
523    #[test]
524    fn test_ascii_get2() {
525        let mut cache = Cache::new();
526        cache
527            .r
528            .get_mut()
529            .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\nVALUE bar 0 3\r\nbaz\r\nEND\r\n");
530        let mut ascii = super::Protocol::new(&mut cache);
531        assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
532        assert_eq!(block_on(ascii.get(&"bar")).unwrap(), b"baz");
533    }
534
535    #[test]
536    fn test_ascii_get_cas() {
537        let mut cache = Cache::new();
538        cache
539            .r
540            .get_mut()
541            .extend_from_slice(b"VALUE foo 0 3 99999\r\nbar\r\nEND\r\n");
542        let mut ascii = super::Protocol::new(&mut cache);
543        assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
544        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
545    }
546
547    #[test]
548    fn test_ascii_get_empty() {
549        let mut cache = Cache::new();
550        cache.r.get_mut().extend_from_slice(b"END\r\n");
551        let mut ascii = super::Protocol::new(&mut cache);
552        assert_eq!(
553            block_on(ascii.get(&"foo")).unwrap_err().kind(),
554            ErrorKind::NotFound
555        );
556        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
557    }
558
559    #[test]
560    fn test_ascii_get_eof_error() {
561        let mut cache = Cache::new();
562        cache.r.get_mut().extend_from_slice(b"EN");
563        let mut ascii = super::Protocol::new(&mut cache);
564        assert_eq!(
565            block_on(ascii.get(&"foo")).unwrap_err().kind(),
566            ErrorKind::UnexpectedEof
567        );
568        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
569    }
570
571    #[test]
572    fn test_ascii_get_one() {
573        let mut cache = Cache::new();
574        cache
575            .r
576            .get_mut()
577            .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\n");
578        let mut ascii = super::Protocol::new(&mut cache);
579        let keys = vec!["foo"];
580        let map = block_on(ascii.get_multi(&keys)).unwrap();
581        assert_eq!(map.len(), 1);
582        assert_eq!(map.get("foo").unwrap(), b"bar");
583        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
584    }
585
586    #[test]
587    fn test_ascii_get_many() {
588        let mut cache = Cache::new();
589        cache
590            .r
591            .get_mut()
592            .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nVALUE baz 44 4\r\ncrux\r\nEND\r\n");
593        let mut ascii = super::Protocol::new(&mut cache);
594        let keys = vec!["foo", "baz", "blah"];
595        let map = block_on(ascii.get_multi(&keys)).unwrap();
596        assert_eq!(map.len(), 2);
597        assert_eq!(map.get("foo").unwrap(), b"bar");
598        assert_eq!(map.get("baz").unwrap(), b"crux");
599        assert_eq!(cache.w.get_ref(), b"get foo baz blah\r\n");
600    }
601
602    #[test]
603    fn test_ascii_get_prefix() {
604        let mut cache = Cache::new();
605        cache
606            .r
607            .get_mut()
608            .extend_from_slice(b"VALUE key 0 3\r\nbar\r\nVALUE kez 44 4\r\ncrux\r\nEND\r\n");
609        let mut ascii = super::Protocol::new(&mut cache);
610        let key_prefix = "ke";
611        let map = block_on(ascii.get_prefix(&key_prefix, None)).unwrap();
612        assert_eq!(map.len(), 2);
613        assert_eq!(map.get("key").unwrap(), b"bar");
614        assert_eq!(map.get("kez").unwrap(), b"crux");
615        assert_eq!(cache.w.get_ref(), b"get_prefix ke\r\n");
616    }
617
618    #[test]
619    fn test_ascii_get_multi_empty() {
620        let mut cache = Cache::new();
621        cache.r.get_mut().extend_from_slice(b"END\r\n");
622        let mut ascii = super::Protocol::new(&mut cache);
623        let keys = vec!["foo", "baz"];
624        let map = block_on(ascii.get_multi(&keys)).unwrap();
625        assert!(map.is_empty());
626        assert_eq!(cache.w.get_ref(), b"get foo baz\r\n");
627    }
628
629    #[test]
630    fn test_ascii_get_multi_zero_keys() {
631        let mut cache = Cache::new();
632        cache.r.get_mut().extend_from_slice(b"END\r\n");
633        let mut ascii = super::Protocol::new(&mut cache);
634        let map = block_on(ascii.get_multi::<&str>(&[])).unwrap();
635        assert!(map.is_empty());
636        assert_eq!(cache.w.get_ref(), b"");
637    }
638
639    #[test]
640    fn test_ascii_set() {
641        let (key, val, ttl) = ("foo", "bar", 5);
642        let mut cache = Cache::new();
643        let mut ascii = super::Protocol::new(&mut cache);
644        block_on(ascii.set(&key, val.as_bytes(), ttl)).unwrap();
645        assert_eq!(
646            cache.w.get_ref(),
647            &format!("set {} 0 {} {} noreply\r\n{}\r\n", key, ttl, val.len(), val)
648                .as_bytes()
649                .to_vec()
650        );
651    }
652
653    #[test]
654    fn test_ascii_add_new_key() {
655        let (key, val, ttl) = ("foo", "bar", 5);
656        let mut cache = Cache::new();
657        cache.r.get_mut().extend_from_slice(b"STORED\r\n");
658        let mut ascii = super::Protocol::new(&mut cache);
659        block_on(ascii.add(&key, val.as_bytes(), ttl)).unwrap();
660        assert_eq!(
661            cache.w.get_ref(),
662            &format!("add {} 0 {} {}\r\n{}\r\n", key, ttl, val.len(), val)
663                .as_bytes()
664                .to_vec()
665        );
666    }
667
668    #[test]
669    fn test_ascii_add_duplicate() {
670        let (key, val, ttl) = ("foo", "bar", 5);
671        let mut cache = Cache::new();
672        cache.r.get_mut().extend_from_slice(b"NOT_STORED\r\n");
673        let mut ascii = super::Protocol::new(&mut cache);
674        assert_eq!(
675            block_on(ascii.add(&key, val.as_bytes(), ttl))
676                .unwrap_err()
677                .kind(),
678            ErrorKind::AlreadyExists
679        );
680    }
681
682    #[test]
683    fn test_ascii_version() {
684        let mut cache = Cache::new();
685        cache.r.get_mut().extend_from_slice(b"VERSION 1.6.6\r\n");
686        let mut ascii = super::Protocol::new(&mut cache);
687        assert_eq!(block_on(ascii.version()).unwrap(), "1.6.6");
688        assert_eq!(cache.w.get_ref(), b"version\r\n");
689    }
690
691    #[test]
692    fn test_ascii_flush() {
693        let mut cache = Cache::new();
694        cache.r.get_mut().extend_from_slice(b"OK\r\n");
695        let mut ascii = super::Protocol::new(&mut cache);
696        assert!(block_on(ascii.flush()).is_ok());
697        assert_eq!(cache.w.get_ref(), b"flush_all\r\n");
698    }
699
700    #[test]
701    fn test_ascii_increment() {
702        let mut cache = Cache::new();
703        cache.r.get_mut().extend_from_slice(b"2\r\n");
704        let mut ascii = super::Protocol::new(&mut cache);
705        assert_eq!(block_on(ascii.increment("foo", 1)).unwrap(), 2);
706        assert_eq!(cache.w.get_ref(), b"incr foo 1\r\n");
707    }
708
709    #[test]
710    fn test_ascii_decrement() {
711        let mut cache = Cache::new();
712        cache.r.get_mut().extend_from_slice(b"0\r\n");
713        let mut ascii = super::Protocol::new(&mut cache);
714        assert_eq!(block_on(ascii.decrement("foo", 1)).unwrap(), 0);
715        assert_eq!(cache.w.get_ref(), b"decr foo 1\r\n");
716    }
717
718    #[test]
719    fn test_ascii_gets_cas() {
720        let mut cache = Cache::new();
721        cache
722            .r
723            .get_mut()
724            .extend_from_slice(b"VALUE foo 0 3 77\r\nbar\r\nEND\r\n");
725        let mut ascii = super::Protocol::new(&mut cache);
726        assert_eq!(
727            block_on(ascii.gets_cas(&"foo")).unwrap(),
728            (Vec::from(b"bar"), 77)
729        );
730        assert_eq!(cache.w.get_ref(), b"gets foo\r\n");
731    }
732
733    #[test]
734    fn test_ascii_cas() {
735        let (key, val, cas_id, ttl) = ("foo", "bar", 33, 5);
736        let mut cache = Cache::new();
737        cache
738            .r
739            .get_mut()
740            .extend_from_slice(b"STORED\r\nbar\r\nEND\r\n");
741        let mut ascii = super::Protocol::new(&mut cache);
742        block_on(ascii.cas(&key, val.as_bytes(), cas_id, ttl)).unwrap();
743        assert_eq!(
744            cache.w.get_ref(),
745            &format!(
746                "cas {} 0 {} {} {}\r\n{}\r\n",
747                key,
748                ttl,
749                val.len(),
750                cas_id,
751                val
752            )
753            .as_bytes()
754            .to_vec()
755        );
756    }
757
758    #[test]
759    fn test_ascii_append_or_vivify() {
760        let (key, val, ttl) = ("foo", "bar", 5);
761        let mut cache = Cache::new();
762        cache.r.get_mut().extend_from_slice(b"HD\r\nbar\r\nEND\r\n");
763        let mut ascii = super::Protocol::new(&mut cache);
764        block_on(ascii.append_or_vivify(&key, val.as_bytes(), ttl)).unwrap();
765        assert_eq!(
766            cache.w.get_ref(),
767            &format!("ms {} {} MA N{}\r\n{}\r\n", key, val.len(), ttl, val)
768                .as_bytes()
769                .to_vec()
770        );
771    }
772
773    #[test]
774    fn test_response_starting_with_error() {
775        let mut cache = Cache::new();
776        cache.r.get_mut().extend_from_slice(b"ERROR\r\n");
777        let mut ascii = super::Protocol::new(&mut cache);
778        let err = block_on(ascii.get(&"foo")).unwrap_err();
779        assert_eq!(err.kind(), ErrorKind::Other);
780    }
781
782    #[test]
783    fn test_response_containing_error_string() {
784        let mut cache = Cache::new();
785        cache
786            .r
787            .get_mut()
788            .extend_from_slice(b"VALUE contains_ERROR 0 3\r\nbar\r\nEND\r\n");
789        let mut ascii = super::Protocol::new(&mut cache);
790        assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
791    }
792
793    #[test]
794    fn test_client_error_response() {
795        let mut cache = Cache::new();
796        cache
797            .r
798            .get_mut()
799            .extend_from_slice(b"CLIENT_ERROR bad command\r\n");
800        let mut ascii = super::Protocol::new(&mut cache);
801        let err = block_on(ascii.get(&"foo")).unwrap_err();
802        assert_eq!(err.kind(), ErrorKind::Other);
803    }
804
805    #[test]
806    fn test_server_error_response() {
807        let mut cache = Cache::new();
808        cache
809            .r
810            .get_mut()
811            .extend_from_slice(b"SERVER_ERROR out of memory\r\n");
812        let mut ascii = super::Protocol::new(&mut cache);
813        let err = block_on(ascii.get(&"foo")).unwrap_err();
814        assert_eq!(err.kind(), ErrorKind::Other);
815    }
816}