memcache_async/
ascii.rs

1//! This is a simplified implementation of [rust-memcache](https://github.com/aisk/rust-memcache)
2//! ported for AsyncRead + AsyncWrite.
3use core::fmt::Display;
4#[cfg(feature = "with-futures")]
5use futures::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
6
7#[cfg(feature = "with-tokio")]
8use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
9
10use std::collections::HashMap;
11use std::io::{Error, ErrorKind};
12use std::marker::Unpin;
13
14/// Memcache ASCII protocol implementation.
15pub struct Protocol<S> {
16    io: BufReader<S>,
17    buf: Vec<u8>,
18}
19
20impl<S> Protocol<S>
21where
22    S: AsyncRead + AsyncWrite + Unpin,
23{
24    /// Creates the ASCII protocol on a stream.
25    pub fn new(io: S) -> Self {
26        Self {
27            io: BufReader::new(io),
28            buf: Vec::new(),
29        }
30    }
31
32    /// Returns the value for given key as bytes. If the value doesn't exist, [`ErrorKind::NotFound`] is returned.
33    pub async fn get<K: AsRef<[u8]>>(&mut self, key: K) -> Result<Vec<u8>, Error> {
34        // Send command
35        let writer = self.io.get_mut();
36        writer
37            .write_all(&[b"get ", key.as_ref(), b"\r\n"].concat())
38            .await?;
39        writer.flush().await?;
40
41        let (val, _) = self.read_get_response().await?;
42        Ok(val)
43    }
44
45    async fn read_get_response(&mut self) -> Result<(Vec<u8>, Option<u64>), Error> {
46        // Read response header
47        let header = self.read_line().await?;
48        let header = std::str::from_utf8(header).map_err(|_| ErrorKind::InvalidData)?;
49
50        // Check response header and parse value length
51        if header.starts_with("ERROR")
52            || header.starts_with("CLIENT_ERROR")
53            || header.starts_with("SERVER_ERROR")
54        {
55            return Err(Error::new(ErrorKind::Other, header));
56        } else if header.starts_with("END") {
57            return Err(ErrorKind::NotFound.into());
58        }
59
60        // VALUE <key> <flags> <bytes> [<cas unique>]\r\n
61        let mut parts = header.split(' ');
62        let length: usize = parts
63            .nth(3)
64            .and_then(|len| len.trim_end().parse().ok())
65            .ok_or(ErrorKind::InvalidData)?;
66
67        // cas is present only if gets is called
68        let cas: Option<u64> = parts.next().and_then(|len| len.trim_end().parse().ok());
69
70        // Read value
71        let mut buffer: Vec<u8> = vec![0; length];
72        self.io.read_exact(&mut buffer).await?;
73
74        // Read the trailing header
75        self.read_line().await?; // \r\n
76        self.read_line().await?; // END\r\n
77
78        Ok((buffer, cas))
79    }
80
81    /// Returns values for multiple keys in a single call as a [`HashMap`] from keys to found values.
82    /// If a key is not present in memcached it will be absent from returned map.
83    pub async fn get_multi<K: AsRef<[u8]>>(
84        &mut self,
85        keys: &[K],
86    ) -> Result<HashMap<String, Vec<u8>>, Error> {
87        if keys.is_empty() {
88            return Ok(HashMap::new());
89        }
90
91        // Send command
92        let writer = self.io.get_mut();
93        writer.write_all("get".as_bytes()).await?;
94        for k in keys {
95            writer.write_all(b" ").await?;
96            writer.write_all(k.as_ref()).await?;
97        }
98        writer.write_all(b"\r\n").await?;
99        writer.flush().await?;
100
101        // Read response header
102        self.read_many_values().await
103    }
104
105    async fn read_many_values(&mut self) -> Result<HashMap<String, Vec<u8>>, Error> {
106        let mut map = HashMap::new();
107        loop {
108            let header = {
109                let buf = self.read_line().await?;
110                std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
111            }
112            .to_string();
113            let mut parts = header.split(' ');
114            match parts.next() {
115                Some("VALUE") => {
116                    if let (Some(key), _flags, Some(size_str)) =
117                        (parts.next(), parts.next(), parts.next())
118                    {
119                        let size: usize = size_str
120                            .trim_end()
121                            .parse()
122                            .map_err(|_| Error::from(ErrorKind::InvalidData))?;
123                        let mut buffer: Vec<u8> = vec![0; size];
124                        self.io.read_exact(&mut buffer).await?;
125                        let mut crlf = vec![0; 2];
126                        self.io.read_exact(&mut crlf).await?;
127
128                        map.insert(key.to_owned(), buffer);
129                    } else {
130                        return Err(Error::new(ErrorKind::InvalidData, header));
131                    }
132                }
133                Some("END\r\n") => return Ok(map),
134                Some("ERROR") => return Err(Error::new(ErrorKind::Other, header)),
135                _ => return Err(Error::new(ErrorKind::InvalidData, header)),
136            }
137        }
138    }
139
140    /// Get up to `limit` keys which match the given prefix. Returns a [HashMap] from keys to found values.
141    /// This is not part of the Memcached standard, but some servers implement it nonetheless.
142    pub async fn get_prefix<K: Display>(
143        &mut self,
144        key_prefix: K,
145        limit: Option<usize>,
146    ) -> Result<HashMap<String, Vec<u8>>, Error> {
147        // Send command
148        let header = if let Some(limit) = limit {
149            format!("get_prefix {} {}\r\n", key_prefix, limit)
150        } else {
151            format!("get_prefix {}\r\n", key_prefix)
152        };
153        self.io.write_all(header.as_bytes()).await?;
154        self.io.flush().await?;
155
156        // Read response header
157        self.read_many_values().await
158    }
159
160    /// Add a key. If the value exists, [`ErrorKind::AlreadyExists`] is returned.
161    pub async fn add<K: Display>(
162        &mut self,
163        key: K,
164        val: &[u8],
165        expiration: u32,
166    ) -> Result<(), Error> {
167        // Send command
168        let header = format!("add {} 0 {} {}\r\n", key, expiration, val.len());
169        self.io.write_all(header.as_bytes()).await?;
170        self.io.write_all(val).await?;
171        self.io.write_all(b"\r\n").await?;
172        self.io.flush().await?;
173
174        // Read response header
175        let header = {
176            let buf = self.read_line().await?;
177            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
178        };
179
180        // Check response header and parse value length
181        if header.contains("ERROR") {
182            return Err(Error::new(ErrorKind::Other, header));
183        } else if header.starts_with("NOT_STORED") {
184            return Err(ErrorKind::AlreadyExists.into());
185        }
186
187        Ok(())
188    }
189
190    /// Set key to given value and don't wait for response.
191    pub async fn set<K: Display>(
192        &mut self,
193        key: K,
194        val: &[u8],
195        expiration: u32,
196    ) -> Result<(), Error> {
197        let header = format!("set {} 0 {} {} noreply\r\n", key, expiration, val.len());
198        self.io.write_all(header.as_bytes()).await?;
199        self.io.write_all(val).await?;
200        self.io.write_all(b"\r\n").await?;
201        self.io.flush().await?;
202        Ok(())
203    }
204
205    /// Append bytes to the value in memcached and don't wait for response.
206    pub async fn append<K: Display>(&mut self, key: K, val: &[u8]) -> Result<(), Error> {
207        let header = format!("append {} 0 0 {} noreply\r\n", key, val.len());
208        self.io.write_all(header.as_bytes()).await?;
209        self.io.write_all(val).await?;
210        self.io.write_all(b"\r\n").await?;
211        self.io.flush().await?;
212        Ok(())
213    }
214
215    /// Delete a key and don't wait for response.
216    pub async fn delete<K: Display>(&mut self, key: K) -> Result<(), Error> {
217        let header = format!("delete {} noreply\r\n", key);
218        self.io.write_all(header.as_bytes()).await?;
219        self.io.flush().await?;
220        Ok(())
221    }
222
223    /// Return the version of the remote server.
224    pub async fn version(&mut self) -> Result<String, Error> {
225        self.io.write_all(b"version\r\n").await?;
226        self.io.flush().await?;
227
228        // Read response header
229        let header = {
230            let buf = self.read_line().await?;
231            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
232        };
233
234        if !header.starts_with("VERSION") {
235            return Err(Error::new(ErrorKind::Other, header));
236        }
237        let version = header.trim_start_matches("VERSION ").trim_end();
238        Ok(version.to_string())
239    }
240
241    /// Delete all keys from the cache.
242    pub async fn flush(&mut self) -> Result<(), Error> {
243        self.io.write_all(b"flush_all\r\n").await?;
244        self.io.flush().await?;
245
246        // Read response header
247        let header = {
248            let buf = self.read_line().await?;
249            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
250        };
251
252        if header == "OK\r\n" {
253            Ok(())
254        } else {
255            Err(Error::new(ErrorKind::Other, header))
256        }
257    }
258
259    /// Increment a specific integer stored with a key by a given value. If the value doesn't exist, [`ErrorKind::NotFound`] is returned.
260    /// Otherwise the new value is returned
261    pub async fn increment<K: AsRef<[u8]>>(&mut self, key: K, amount: u64) -> Result<u64, Error> {
262        // Send command
263        let writer = self.io.get_mut();
264        let buf = &[
265            b"incr ",
266            key.as_ref(),
267            b" ",
268            amount.to_string().as_bytes(),
269            b"\r\n",
270        ]
271        .concat();
272        writer.write_all(buf).await?;
273        writer.flush().await?;
274
275        // Read response header
276        let header = {
277            let buf = self.read_line().await?;
278            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
279        };
280
281        if header == "NOT_FOUND\r\n" {
282            Err(ErrorKind::NotFound.into())
283        } else {
284            let value = header
285                .trim_end()
286                .parse::<u64>()
287                .map_err(|_| Error::from(ErrorKind::InvalidData))?;
288            Ok(value)
289        }
290    }
291
292    /// Increment a specific integer stored with a key by a given value without waiting for a response.
293    pub async fn increment_noreply<K: AsRef<[u8]>>(
294        &mut self,
295        key: K,
296        amount: u64,
297    ) -> Result<(), Error> {
298        let writer = self.io.get_mut();
299        let buf = &[
300            b"incr ",
301            key.as_ref(),
302            b" ",
303            amount.to_string().as_bytes(),
304            b" noreply\r\n",
305        ]
306        .concat();
307        writer.write_all(buf).await?;
308        writer.flush().await?;
309        Ok(())
310    }
311
312    /// Decrement a specific integer stored with a key by a given value. If the value doesn't exist, [`ErrorKind::NotFound`] is returned.
313    /// Otherwise the new value is returned
314    pub async fn decrement<K: AsRef<[u8]>>(&mut self, key: K, amount: u64) -> Result<u64, Error> {
315        // Send command
316        let writer = self.io.get_mut();
317        let buf = &[
318            b"decr ",
319            key.as_ref(),
320            b" ",
321            amount.to_string().as_bytes(),
322            b"\r\n",
323        ]
324        .concat();
325        writer.write_all(buf).await?;
326        writer.flush().await?;
327
328        // Read response header
329        let header = {
330            let buf = self.read_line().await?;
331            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
332        };
333
334        if header == "NOT_FOUND\r\n" {
335            Err(ErrorKind::NotFound.into())
336        } else {
337            let value = header
338                .trim_end()
339                .parse::<u64>()
340                .map_err(|_| Error::from(ErrorKind::InvalidData))?;
341            Ok(value)
342        }
343    }
344
345    async fn read_line(&mut self) -> Result<&[u8], Error> {
346        let Self { io, buf } = self;
347        buf.clear();
348        io.read_until(b'\n', buf).await?;
349        if buf.last().copied() != Some(b'\n') {
350            return Err(ErrorKind::UnexpectedEof.into());
351        }
352        Ok(&buf[..])
353    }
354
355    /// Call gets to also return CAS id, which can be used to run a second CAS command
356    pub async fn gets_cas<K: AsRef<[u8]>>(&mut self, key: K) -> Result<(Vec<u8>, u64), Error> {
357        // Send command
358        let writer = self.io.get_mut();
359        writer
360            .write_all(&[b"gets ", key.as_ref(), b"\r\n"].concat())
361            .await?;
362        writer.flush().await?;
363
364        let (val, maybe_cas) = self.read_get_response().await?;
365        let cas = maybe_cas.ok_or(ErrorKind::InvalidData)?;
366
367        Ok((val, cas))
368    }
369
370    // CAS: compare and swap a value. the value of `cas` can be obtained by first making a gets_cas
371    // call
372    // returns true/false to indicate the cas operation succeeded or failed
373    // returns an error for all other failures
374    pub async fn cas<K: Display>(
375        &mut self,
376        key: K,
377        val: &[u8],
378        cas_id: u64,
379        expiration: u32,
380    ) -> Result<bool, Error> {
381        let header = format!("cas {} 0 {} {} {}\r\n", key, expiration, val.len(), cas_id);
382        self.io.write_all(header.as_bytes()).await?;
383        self.io.write_all(val).await?;
384        self.io.write_all(b"\r\n").await?;
385        self.io.flush().await?;
386
387        // Read response header
388        let header = {
389            let buf = self.read_line().await?;
390            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
391        };
392
393        /* From memcached docs:
394         *    After sending the command line and the data block the client awaits
395         *    the reply, which may be:
396         *
397         *    - "STORED\r\n", to indicate success.
398         *
399         *    - "NOT_STORED\r\n" to indicate the data was not stored, but not
400         *    because of an error. This normally means that the
401         *    condition for an "add" or a "replace" command wasn't met.
402         *
403         *    - "EXISTS\r\n" to indicate that the item you are trying to store with
404         *    a "cas" command has been modified since you last fetched it.
405         *
406         *    - "NOT_FOUND\r\n" to indicate that the item you are trying to store
407         *    with a "cas" command did not exist.
408         */
409
410        if header.starts_with("STORED") {
411            Ok(true)
412        } else if header.starts_with("EXISTS") || header.starts_with("NOT_STORED") {
413            Ok(false)
414        } else if header.starts_with("NOT FOUND") {
415            Err(ErrorKind::NotFound.into())
416        } else {
417            Err(Error::new(ErrorKind::Other, header))
418        }
419    }
420
421    /// Append bytes to the value in memcached, and creates the key if it is missing instead of failing compared to simple append
422    pub async fn append_or_vivify<K: Display>(
423        &mut self,
424        key: K,
425        val: &[u8],
426        ttl: u32,
427    ) -> Result<(), Error> {
428        /* From memcached docs:
429         * - M(token): mode switch to change behavior to add, replace, append, prepend. Takes a single character for the mode.
430         *       A: "append" command. If item exists, append the new value to its data.
431         * ----
432         * The "cas" command is supplanted by specifying the cas value with the 'C' flag.
433         * Append and Prepend modes will also respect a supplied cas value.
434         *
435         * - N(token): if in append mode, autovivify on miss with supplied TTL
436         *
437         * If N is supplied, and append reaches a miss, it will
438         * create a new item seeded with the data from the append command.
439         */
440        let header = format!("ms {} {} MA N{}\r\n", key, val.len(), ttl);
441        self.io.write_all(header.as_bytes()).await?;
442        self.io.write_all(val).await?;
443        self.io.write_all(b"\r\n").await?;
444        self.io.flush().await?;
445
446        // Read response header
447        let header = {
448            let buf = self.read_line().await?;
449            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
450        };
451
452        /* From memcached docs:
453         *    After sending the command line and the data block the client awaits
454         *    the reply, which is of the format:
455         *
456         *    <CD> <flags>*\r\n
457         *
458         *    Where CD is one of:
459         *
460         *    - "HD" (STORED), to indicate success.
461         *
462         *    - "NS" (NOT_STORED), to indicate the data was not stored, but not
463         *    because of an error.
464         *
465         *    - "EX" (EXISTS), to indicate that the item you are trying to store with
466         *    CAS semantics has been modified since you last fetched it.
467         *
468         *    - "NF" (NOT_FOUND), to indicate that the item you are trying to store
469         *    with CAS semantics did not exist.
470         */
471
472        if header.starts_with("HD") {
473            Ok(())
474        } else {
475            Err(Error::new(ErrorKind::Other, header))
476        }
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use futures::executor::block_on;
483    use futures::io::{AsyncRead, AsyncWrite};
484
485    use std::io::{Cursor, Error, ErrorKind, Read, Write};
486    use std::pin::Pin;
487    use std::task::{Context, Poll};
488
489    struct Cache {
490        r: Cursor<Vec<u8>>,
491        w: Cursor<Vec<u8>>,
492    }
493
494    impl Cache {
495        fn new() -> Self {
496            Cache {
497                r: Cursor::new(Vec::new()),
498                w: Cursor::new(Vec::new()),
499            }
500        }
501    }
502
503    impl AsyncRead for Cache {
504        fn poll_read(
505            self: Pin<&mut Self>,
506            _cx: &mut Context,
507            buf: &mut [u8],
508        ) -> Poll<Result<usize, Error>> {
509            Poll::Ready(self.get_mut().r.read(buf))
510        }
511    }
512
513    impl AsyncWrite for Cache {
514        fn poll_write(
515            self: Pin<&mut Self>,
516            _cx: &mut Context,
517            buf: &[u8],
518        ) -> Poll<Result<usize, Error>> {
519            Poll::Ready(self.get_mut().w.write(buf))
520        }
521
522        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
523            Poll::Ready(self.get_mut().w.flush())
524        }
525
526        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
527            Poll::Ready(Ok(()))
528        }
529    }
530
531    #[test]
532    fn test_ascii_get() {
533        let mut cache = Cache::new();
534        cache
535            .r
536            .get_mut()
537            .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\n");
538        let mut ascii = super::Protocol::new(&mut cache);
539        assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
540        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
541    }
542
543    #[test]
544    fn test_ascii_get2() {
545        let mut cache = Cache::new();
546        cache
547            .r
548            .get_mut()
549            .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\nVALUE bar 0 3\r\nbaz\r\nEND\r\n");
550        let mut ascii = super::Protocol::new(&mut cache);
551        assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
552        assert_eq!(block_on(ascii.get(&"bar")).unwrap(), b"baz");
553    }
554
555    #[test]
556    fn test_ascii_get_cas() {
557        let mut cache = Cache::new();
558        cache
559            .r
560            .get_mut()
561            .extend_from_slice(b"VALUE foo 0 3 99999\r\nbar\r\nEND\r\n");
562        let mut ascii = super::Protocol::new(&mut cache);
563        assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
564        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
565    }
566
567    #[test]
568    fn test_ascii_get_empty() {
569        let mut cache = Cache::new();
570        cache.r.get_mut().extend_from_slice(b"END\r\n");
571        let mut ascii = super::Protocol::new(&mut cache);
572        assert_eq!(
573            block_on(ascii.get(&"foo")).unwrap_err().kind(),
574            ErrorKind::NotFound
575        );
576        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
577    }
578
579    #[test]
580    fn test_ascii_get_eof_error() {
581        let mut cache = Cache::new();
582        cache.r.get_mut().extend_from_slice(b"EN");
583        let mut ascii = super::Protocol::new(&mut cache);
584        assert_eq!(
585            block_on(ascii.get(&"foo")).unwrap_err().kind(),
586            ErrorKind::UnexpectedEof
587        );
588        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
589    }
590
591    #[test]
592    fn test_ascii_get_one() {
593        let mut cache = Cache::new();
594        cache
595            .r
596            .get_mut()
597            .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\n");
598        let mut ascii = super::Protocol::new(&mut cache);
599        let keys = vec!["foo"];
600        let map = block_on(ascii.get_multi(&keys)).unwrap();
601        assert_eq!(map.len(), 1);
602        assert_eq!(map.get("foo").unwrap(), b"bar");
603        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
604    }
605
606    #[test]
607    fn test_ascii_get_many() {
608        let mut cache = Cache::new();
609        cache
610            .r
611            .get_mut()
612            .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nVALUE baz 44 4\r\ncrux\r\nEND\r\n");
613        let mut ascii = super::Protocol::new(&mut cache);
614        let keys = vec!["foo", "baz", "blah"];
615        let map = block_on(ascii.get_multi(&keys)).unwrap();
616        assert_eq!(map.len(), 2);
617        assert_eq!(map.get("foo").unwrap(), b"bar");
618        assert_eq!(map.get("baz").unwrap(), b"crux");
619        assert_eq!(cache.w.get_ref(), b"get foo baz blah\r\n");
620    }
621
622    #[test]
623    fn test_ascii_get_prefix() {
624        let mut cache = Cache::new();
625        cache
626            .r
627            .get_mut()
628            .extend_from_slice(b"VALUE key 0 3\r\nbar\r\nVALUE kez 44 4\r\ncrux\r\nEND\r\n");
629        let mut ascii = super::Protocol::new(&mut cache);
630        let key_prefix = "ke";
631        let map = block_on(ascii.get_prefix(&key_prefix, None)).unwrap();
632        assert_eq!(map.len(), 2);
633        assert_eq!(map.get("key").unwrap(), b"bar");
634        assert_eq!(map.get("kez").unwrap(), b"crux");
635        assert_eq!(cache.w.get_ref(), b"get_prefix ke\r\n");
636    }
637
638    #[test]
639    fn test_ascii_get_multi_empty() {
640        let mut cache = Cache::new();
641        cache.r.get_mut().extend_from_slice(b"END\r\n");
642        let mut ascii = super::Protocol::new(&mut cache);
643        let keys = vec!["foo", "baz"];
644        let map = block_on(ascii.get_multi(&keys)).unwrap();
645        assert!(map.is_empty());
646        assert_eq!(cache.w.get_ref(), b"get foo baz\r\n");
647    }
648
649    #[test]
650    fn test_ascii_get_multi_zero_keys() {
651        let mut cache = Cache::new();
652        cache.r.get_mut().extend_from_slice(b"END\r\n");
653        let mut ascii = super::Protocol::new(&mut cache);
654        let map = block_on(ascii.get_multi::<&str>(&[])).unwrap();
655        assert!(map.is_empty());
656        assert_eq!(cache.w.get_ref(), b"");
657    }
658
659    #[test]
660    fn test_ascii_set() {
661        let (key, val, ttl) = ("foo", "bar", 5);
662        let mut cache = Cache::new();
663        let mut ascii = super::Protocol::new(&mut cache);
664        block_on(ascii.set(&key, val.as_bytes(), ttl)).unwrap();
665        assert_eq!(
666            cache.w.get_ref(),
667            &format!("set {} 0 {} {} noreply\r\n{}\r\n", key, ttl, val.len(), val)
668                .as_bytes()
669                .to_vec()
670        );
671    }
672
673    #[test]
674    fn test_ascii_increment_noreply() {
675        let (key, amount) = ("foo", 5);
676        let mut cache = Cache::new();
677        let mut ascii = super::Protocol::new(&mut cache);
678        block_on(ascii.increment_noreply(key.as_bytes(), amount)).unwrap();
679        assert_eq!(
680            cache.w.get_ref(),
681            &format!("incr {} {} noreply\r\n", key, amount)
682                .as_bytes()
683                .to_vec()
684        );
685    }
686
687    #[test]
688    fn test_ascii_add_new_key() {
689        let (key, val, ttl) = ("foo", "bar", 5);
690        let mut cache = Cache::new();
691        cache.r.get_mut().extend_from_slice(b"STORED\r\n");
692        let mut ascii = super::Protocol::new(&mut cache);
693        block_on(ascii.add(&key, val.as_bytes(), ttl)).unwrap();
694        assert_eq!(
695            cache.w.get_ref(),
696            &format!("add {} 0 {} {}\r\n{}\r\n", key, ttl, val.len(), val)
697                .as_bytes()
698                .to_vec()
699        );
700    }
701
702    #[test]
703    fn test_ascii_add_duplicate() {
704        let (key, val, ttl) = ("foo", "bar", 5);
705        let mut cache = Cache::new();
706        cache.r.get_mut().extend_from_slice(b"NOT_STORED\r\n");
707        let mut ascii = super::Protocol::new(&mut cache);
708        assert_eq!(
709            block_on(ascii.add(&key, val.as_bytes(), ttl))
710                .unwrap_err()
711                .kind(),
712            ErrorKind::AlreadyExists
713        );
714    }
715
716    #[test]
717    fn test_ascii_version() {
718        let mut cache = Cache::new();
719        cache.r.get_mut().extend_from_slice(b"VERSION 1.6.6\r\n");
720        let mut ascii = super::Protocol::new(&mut cache);
721        assert_eq!(block_on(ascii.version()).unwrap(), "1.6.6");
722        assert_eq!(cache.w.get_ref(), b"version\r\n");
723    }
724
725    #[test]
726    fn test_ascii_flush() {
727        let mut cache = Cache::new();
728        cache.r.get_mut().extend_from_slice(b"OK\r\n");
729        let mut ascii = super::Protocol::new(&mut cache);
730        assert!(block_on(ascii.flush()).is_ok());
731        assert_eq!(cache.w.get_ref(), b"flush_all\r\n");
732    }
733
734    #[test]
735    fn test_ascii_increment() {
736        let mut cache = Cache::new();
737        cache.r.get_mut().extend_from_slice(b"2\r\n");
738        let mut ascii = super::Protocol::new(&mut cache);
739        assert_eq!(block_on(ascii.increment("foo", 1)).unwrap(), 2);
740        assert_eq!(cache.w.get_ref(), b"incr foo 1\r\n");
741    }
742
743    #[test]
744    fn test_ascii_decrement() {
745        let mut cache = Cache::new();
746        cache.r.get_mut().extend_from_slice(b"0\r\n");
747        let mut ascii = super::Protocol::new(&mut cache);
748        assert_eq!(block_on(ascii.decrement("foo", 1)).unwrap(), 0);
749        assert_eq!(cache.w.get_ref(), b"decr foo 1\r\n");
750    }
751
752    #[test]
753    fn test_ascii_gets_cas() {
754        let mut cache = Cache::new();
755        cache
756            .r
757            .get_mut()
758            .extend_from_slice(b"VALUE foo 0 3 77\r\nbar\r\nEND\r\n");
759        let mut ascii = super::Protocol::new(&mut cache);
760        assert_eq!(
761            block_on(ascii.gets_cas(&"foo")).unwrap(),
762            (Vec::from(b"bar"), 77)
763        );
764        assert_eq!(cache.w.get_ref(), b"gets foo\r\n");
765    }
766
767    #[test]
768    fn test_ascii_cas() {
769        let (key, val, cas_id, ttl) = ("foo", "bar", 33, 5);
770        let mut cache = Cache::new();
771        cache
772            .r
773            .get_mut()
774            .extend_from_slice(b"STORED\r\nbar\r\nEND\r\n");
775        let mut ascii = super::Protocol::new(&mut cache);
776        block_on(ascii.cas(&key, val.as_bytes(), cas_id, ttl)).unwrap();
777        assert_eq!(
778            cache.w.get_ref(),
779            &format!(
780                "cas {} 0 {} {} {}\r\n{}\r\n",
781                key,
782                ttl,
783                val.len(),
784                cas_id,
785                val
786            )
787            .as_bytes()
788            .to_vec()
789        );
790    }
791
792    #[test]
793    fn test_ascii_append_or_vivify() {
794        let (key, val, ttl) = ("foo", "bar", 5);
795        let mut cache = Cache::new();
796        cache.r.get_mut().extend_from_slice(b"HD\r\nbar\r\nEND\r\n");
797        let mut ascii = super::Protocol::new(&mut cache);
798        block_on(ascii.append_or_vivify(&key, val.as_bytes(), ttl)).unwrap();
799        assert_eq!(
800            cache.w.get_ref(),
801            &format!("ms {} {} MA N{}\r\n{}\r\n", key, val.len(), ttl, val)
802                .as_bytes()
803                .to_vec()
804        );
805    }
806
807    #[test]
808    fn test_response_starting_with_error() {
809        let mut cache = Cache::new();
810        cache.r.get_mut().extend_from_slice(b"ERROR\r\n");
811        let mut ascii = super::Protocol::new(&mut cache);
812        let err = block_on(ascii.get(&"foo")).unwrap_err();
813        assert_eq!(err.kind(), ErrorKind::Other);
814    }
815
816    #[test]
817    fn test_response_containing_error_string() {
818        let mut cache = Cache::new();
819        cache
820            .r
821            .get_mut()
822            .extend_from_slice(b"VALUE contains_ERROR 0 3\r\nbar\r\nEND\r\n");
823        let mut ascii = super::Protocol::new(&mut cache);
824        assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
825    }
826
827    #[test]
828    fn test_client_error_response() {
829        let mut cache = Cache::new();
830        cache
831            .r
832            .get_mut()
833            .extend_from_slice(b"CLIENT_ERROR bad command\r\n");
834        let mut ascii = super::Protocol::new(&mut cache);
835        let err = block_on(ascii.get(&"foo")).unwrap_err();
836        assert_eq!(err.kind(), ErrorKind::Other);
837    }
838
839    #[test]
840    fn test_server_error_response() {
841        let mut cache = Cache::new();
842        cache
843            .r
844            .get_mut()
845            .extend_from_slice(b"SERVER_ERROR out of memory\r\n");
846        let mut ascii = super::Protocol::new(&mut cache);
847        let err = block_on(ascii.get(&"foo")).unwrap_err();
848        assert_eq!(err.kind(), ErrorKind::Other);
849    }
850}