Skip to main content

memcache_async/
ascii.rs

1//! This is a simplified implementation of [rust-memcache](https://github.com/aisk/rust-memcache)
2//! ported for AsyncRead + AsyncWrite.
3use core::fmt::Display;
4#[cfg(feature = "with-futures")]
5use futures::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
6
7#[cfg(feature = "with-tokio")]
8use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
9
10use std::collections::HashMap;
11use std::io::{Error, ErrorKind};
12use std::marker::Unpin;
13
14/// Check if a response header indicates an error from memcached.
15fn is_memcached_error(header: &str) -> bool {
16    header.starts_with("ERROR")
17        || header.starts_with("CLIENT_ERROR")
18        || header.starts_with("SERVER_ERROR")
19}
20
21/// Memcache ASCII protocol implementation.
22pub struct Protocol<S> {
23    io: BufReader<S>,
24    buf: Vec<u8>,
25}
26
27impl<S> Protocol<S>
28where
29    S: AsyncRead + AsyncWrite + Unpin,
30{
31    /// Creates the ASCII protocol on a stream.
32    pub fn new(io: S) -> Self {
33        Self {
34            io: BufReader::new(io),
35            buf: Vec::new(),
36        }
37    }
38
39    /// Returns the value for given key as bytes. If the value doesn't exist, [`ErrorKind::NotFound`] is returned.
40    pub async fn get<K: AsRef<[u8]>>(&mut self, key: K) -> Result<Vec<u8>, Error> {
41        // Send command
42        let writer = self.io.get_mut();
43        writer
44            .write_all(&[b"get ", key.as_ref(), b"\r\n"].concat())
45            .await?;
46        writer.flush().await?;
47
48        let (val, _) = self.read_get_response().await?;
49        Ok(val)
50    }
51
52    async fn read_get_response(&mut self) -> Result<(Vec<u8>, Option<u64>), Error> {
53        // Read response header
54        let header = self.read_line().await?;
55        let header = std::str::from_utf8(header).map_err(|_| ErrorKind::InvalidData)?;
56
57        // Check response header and parse value length
58        if is_memcached_error(header) {
59            return Err(Error::new(ErrorKind::Other, header));
60        } else if header.starts_with("END") {
61            return Err(ErrorKind::NotFound.into());
62        }
63
64        // VALUE <key> <flags> <bytes> [<cas unique>]\r\n
65        let mut parts = header.split(' ');
66        let length: usize = parts
67            .nth(3)
68            .and_then(|len| len.trim_end().parse().ok())
69            .ok_or(ErrorKind::InvalidData)?;
70
71        // cas is present only if gets is called
72        let cas: Option<u64> = parts.next().and_then(|len| len.trim_end().parse().ok());
73
74        // Read value
75        let mut buffer: Vec<u8> = vec![0; length];
76        self.io.read_exact(&mut buffer).await?;
77
78        // Read the trailing header
79        self.read_line().await?; // \r\n
80        self.read_line().await?; // END\r\n
81
82        Ok((buffer, cas))
83    }
84
85    /// Returns values for multiple keys in a single call as a [`HashMap`] from keys to found values.
86    /// If a key is not present in memcached it will be absent from returned map.
87    pub async fn get_multi<K: AsRef<[u8]>>(
88        &mut self,
89        keys: &[K],
90    ) -> Result<HashMap<String, Vec<u8>>, Error> {
91        if keys.is_empty() {
92            return Ok(HashMap::new());
93        }
94
95        // Send command
96        let writer = self.io.get_mut();
97        writer.write_all("get".as_bytes()).await?;
98        for k in keys {
99            writer.write_all(b" ").await?;
100            writer.write_all(k.as_ref()).await?;
101        }
102        writer.write_all(b"\r\n").await?;
103        writer.flush().await?;
104
105        // Read response header
106        self.read_many_values().await
107    }
108
109    async fn read_many_values(&mut self) -> Result<HashMap<String, Vec<u8>>, Error> {
110        let mut map = HashMap::new();
111        loop {
112            let header = {
113                let buf = self.read_line().await?;
114                std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
115            }
116            .to_string();
117            let mut parts = header.split(' ');
118            match parts.next() {
119                Some("VALUE") => {
120                    if let (Some(key), _flags, Some(size_str)) =
121                        (parts.next(), parts.next(), parts.next())
122                    {
123                        let size: usize = size_str
124                            .trim_end()
125                            .parse()
126                            .map_err(|_| Error::from(ErrorKind::InvalidData))?;
127                        let mut buffer: Vec<u8> = vec![0; size];
128                        self.io.read_exact(&mut buffer).await?;
129                        let mut crlf = vec![0; 2];
130                        self.io.read_exact(&mut crlf).await?;
131
132                        map.insert(key.to_owned(), buffer);
133                    } else {
134                        return Err(Error::new(ErrorKind::InvalidData, header));
135                    }
136                }
137                Some("END\r\n") => return Ok(map),
138                Some("ERROR") => return Err(Error::new(ErrorKind::Other, header)),
139                _ => return Err(Error::new(ErrorKind::InvalidData, header)),
140            }
141        }
142    }
143
144    /// Get up to `limit` keys which match the given prefix. Returns a [HashMap] from keys to found values.
145    /// This is not part of the Memcached standard, but some servers implement it nonetheless.
146    pub async fn get_prefix<K: Display>(
147        &mut self,
148        key_prefix: K,
149        limit: Option<usize>,
150    ) -> Result<HashMap<String, Vec<u8>>, Error> {
151        // Send command
152        let header = if let Some(limit) = limit {
153            format!("get_prefix {} {}\r\n", key_prefix, limit)
154        } else {
155            format!("get_prefix {}\r\n", key_prefix)
156        };
157        self.io.write_all(header.as_bytes()).await?;
158        self.io.flush().await?;
159
160        // Read response header
161        self.read_many_values().await
162    }
163
164    /// Add a key. If the value exists, [`ErrorKind::AlreadyExists`] is returned.
165    pub async fn add<K: Display>(
166        &mut self,
167        key: K,
168        val: &[u8],
169        expiration: u32,
170    ) -> Result<(), Error> {
171        // Send command
172        let header = format!("add {} 0 {} {}\r\n", key, expiration, val.len());
173        self.io.write_all(header.as_bytes()).await?;
174        self.io.write_all(val).await?;
175        self.io.write_all(b"\r\n").await?;
176        self.io.flush().await?;
177
178        // Read response header
179        let header = {
180            let buf = self.read_line().await?;
181            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
182        };
183
184        // Check response header and parse value length
185        if is_memcached_error(header) {
186            return Err(Error::new(ErrorKind::Other, header));
187        } else if header.starts_with("NOT_STORED") {
188            return Err(ErrorKind::AlreadyExists.into());
189        }
190
191        Ok(())
192    }
193
194    /// Set key to given value and don't wait for response.
195    pub async fn set<K: Display>(
196        &mut self,
197        key: K,
198        val: &[u8],
199        expiration: u32,
200    ) -> Result<(), Error> {
201        let header = format!("set {} 0 {} {} noreply\r\n", key, expiration, val.len());
202        self.io.write_all(header.as_bytes()).await?;
203        self.io.write_all(val).await?;
204        self.io.write_all(b"\r\n").await?;
205        self.io.flush().await?;
206        Ok(())
207    }
208
209    /// Append bytes to the value in memcached and don't wait for response.
210    pub async fn append<K: Display>(&mut self, key: K, val: &[u8]) -> Result<(), Error> {
211        let header = format!("append {} 0 0 {} noreply\r\n", key, val.len());
212        self.io.write_all(header.as_bytes()).await?;
213        self.io.write_all(val).await?;
214        self.io.write_all(b"\r\n").await?;
215        self.io.flush().await?;
216        Ok(())
217    }
218
219    /// Delete a key and don't wait for response.
220    pub async fn delete<K: Display>(&mut self, key: K) -> Result<(), Error> {
221        let header = format!("delete {} noreply\r\n", key);
222        self.io.write_all(header.as_bytes()).await?;
223        self.io.flush().await?;
224        Ok(())
225    }
226
227    /// Return the version of the remote server.
228    pub async fn version(&mut self) -> Result<String, Error> {
229        self.io.write_all(b"version\r\n").await?;
230        self.io.flush().await?;
231
232        // Read response header
233        let header = {
234            let buf = self.read_line().await?;
235            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
236        };
237
238        if !header.starts_with("VERSION") {
239            return Err(Error::new(ErrorKind::Other, header));
240        }
241        let version = header.trim_start_matches("VERSION ").trim_end();
242        Ok(version.to_string())
243    }
244
245    /// Delete all keys from the cache.
246    pub async fn flush(&mut self) -> Result<(), Error> {
247        self.io.write_all(b"flush_all\r\n").await?;
248        self.io.flush().await?;
249
250        // Read response header
251        let header = {
252            let buf = self.read_line().await?;
253            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
254        };
255
256        if header == "OK\r\n" {
257            Ok(())
258        } else {
259            Err(Error::new(ErrorKind::Other, header))
260        }
261    }
262
263    /// Increment a specific integer stored with a key by a given value. If the value doesn't exist, [`ErrorKind::NotFound`] is returned.
264    /// Otherwise the new value is returned.
265    ///
266    /// Note: If the key exists but contains a non-numeric value, memcached will return a CLIENT_ERROR.
267    pub async fn increment<K: AsRef<[u8]>>(&mut self, key: K, amount: u64) -> Result<u64, Error> {
268        // Send command
269        let writer = self.io.get_mut();
270        let buf = &[
271            b"incr ",
272            key.as_ref(),
273            b" ",
274            amount.to_string().as_bytes(),
275            b"\r\n",
276        ]
277        .concat();
278        writer.write_all(buf).await?;
279        writer.flush().await?;
280
281        // Read response header
282        let header = {
283            let buf = self.read_line().await?;
284            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
285        };
286
287        if header == "NOT_FOUND\r\n" {
288            Err(ErrorKind::NotFound.into())
289        } else if is_memcached_error(header) {
290            Err(Error::new(ErrorKind::Other, header))
291        } else {
292            let value = header
293                .trim_end()
294                .parse::<u64>()
295                .map_err(|_| Error::from(ErrorKind::InvalidData))?;
296            Ok(value)
297        }
298    }
299
300    /// Increment a specific integer stored with a key by a given value without waiting for a response.
301    pub async fn increment_noreply<K: AsRef<[u8]>>(
302        &mut self,
303        key: K,
304        amount: u64,
305    ) -> Result<(), Error> {
306        let writer = self.io.get_mut();
307        let buf = &[
308            b"incr ",
309            key.as_ref(),
310            b" ",
311            amount.to_string().as_bytes(),
312            b" noreply\r\n",
313        ]
314        .concat();
315        writer.write_all(buf).await?;
316        writer.flush().await?;
317        Ok(())
318    }
319
320    /// Decrement a specific integer stored with a key by a given value. If the value doesn't exist, [`ErrorKind::NotFound`] is returned.
321    /// Otherwise the new value is returned.
322    ///
323    /// Note: If the key exists but contains a non-numeric value, memcached will return a CLIENT_ERROR.
324    pub async fn decrement<K: AsRef<[u8]>>(&mut self, key: K, amount: u64) -> Result<u64, Error> {
325        // Send command
326        let writer = self.io.get_mut();
327        let buf = &[
328            b"decr ",
329            key.as_ref(),
330            b" ",
331            amount.to_string().as_bytes(),
332            b"\r\n",
333        ]
334        .concat();
335        writer.write_all(buf).await?;
336        writer.flush().await?;
337
338        // Read response header
339        let header = {
340            let buf = self.read_line().await?;
341            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
342        };
343
344        if header == "NOT_FOUND\r\n" {
345            Err(ErrorKind::NotFound.into())
346        } else if is_memcached_error(header) {
347            Err(Error::new(ErrorKind::Other, header))
348        } else {
349            let value = header
350                .trim_end()
351                .parse::<u64>()
352                .map_err(|_| Error::from(ErrorKind::InvalidData))?;
353            Ok(value)
354        }
355    }
356
357    async fn read_line(&mut self) -> Result<&[u8], Error> {
358        let Self { io, buf } = self;
359        buf.clear();
360        io.read_until(b'\n', buf).await?;
361        if buf.last().copied() != Some(b'\n') {
362            return Err(ErrorKind::UnexpectedEof.into());
363        }
364        Ok(&buf[..])
365    }
366
367    /// Call gets to also return CAS id, which can be used to run a second CAS command
368    pub async fn gets_cas<K: AsRef<[u8]>>(&mut self, key: K) -> Result<(Vec<u8>, u64), Error> {
369        // Send command
370        let writer = self.io.get_mut();
371        writer
372            .write_all(&[b"gets ", key.as_ref(), b"\r\n"].concat())
373            .await?;
374        writer.flush().await?;
375
376        let (val, maybe_cas) = self.read_get_response().await?;
377        let cas = maybe_cas.ok_or(ErrorKind::InvalidData)?;
378
379        Ok((val, cas))
380    }
381
382    // CAS: compare and swap a value. the value of `cas` can be obtained by first making a gets_cas
383    // call
384    // returns true/false to indicate the cas operation succeeded or failed
385    // returns an error for all other failures
386    pub async fn cas<K: Display>(
387        &mut self,
388        key: K,
389        val: &[u8],
390        cas_id: u64,
391        expiration: u32,
392    ) -> Result<bool, Error> {
393        let header = format!("cas {} 0 {} {} {}\r\n", key, expiration, val.len(), cas_id);
394        self.io.write_all(header.as_bytes()).await?;
395        self.io.write_all(val).await?;
396        self.io.write_all(b"\r\n").await?;
397        self.io.flush().await?;
398
399        // Read response header
400        let header = {
401            let buf = self.read_line().await?;
402            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
403        };
404
405        /* From memcached docs:
406         *    After sending the command line and the data block the client awaits
407         *    the reply, which may be:
408         *
409         *    - "STORED\r\n", to indicate success.
410         *
411         *    - "NOT_STORED\r\n" to indicate the data was not stored, but not
412         *    because of an error. This normally means that the
413         *    condition for an "add" or a "replace" command wasn't met.
414         *
415         *    - "EXISTS\r\n" to indicate that the item you are trying to store with
416         *    a "cas" command has been modified since you last fetched it.
417         *
418         *    - "NOT_FOUND\r\n" to indicate that the item you are trying to store
419         *    with a "cas" command did not exist.
420         */
421
422        if header.starts_with("STORED") {
423            Ok(true)
424        } else if header.starts_with("EXISTS") || header.starts_with("NOT_STORED") {
425            Ok(false)
426        } else if header.starts_with("NOT FOUND") {
427            Err(ErrorKind::NotFound.into())
428        } else {
429            Err(Error::new(ErrorKind::Other, header))
430        }
431    }
432
433    /// Append bytes to the value in memcached, and creates the key if it is missing instead of failing compared to simple append
434    pub async fn append_or_vivify<K: Display>(
435        &mut self,
436        key: K,
437        val: &[u8],
438        ttl: u32,
439    ) -> Result<(), Error> {
440        /* From memcached docs:
441         * - M(token): mode switch to change behavior to add, replace, append, prepend. Takes a single character for the mode.
442         *       A: "append" command. If item exists, append the new value to its data.
443         * ----
444         * The "cas" command is supplanted by specifying the cas value with the 'C' flag.
445         * Append and Prepend modes will also respect a supplied cas value.
446         *
447         * - N(token): if in append mode, autovivify on miss with supplied TTL
448         *
449         * If N is supplied, and append reaches a miss, it will
450         * create a new item seeded with the data from the append command.
451         */
452        let header = format!("ms {} {} MA N{}\r\n", key, val.len(), ttl);
453        self.io.write_all(header.as_bytes()).await?;
454        self.io.write_all(val).await?;
455        self.io.write_all(b"\r\n").await?;
456        self.io.flush().await?;
457
458        // Read response header
459        let header = {
460            let buf = self.read_line().await?;
461            std::str::from_utf8(buf).map_err(|_| Error::from(ErrorKind::InvalidData))?
462        };
463
464        /* From memcached docs:
465         *    After sending the command line and the data block the client awaits
466         *    the reply, which is of the format:
467         *
468         *    <CD> <flags>*\r\n
469         *
470         *    Where CD is one of:
471         *
472         *    - "HD" (STORED), to indicate success.
473         *
474         *    - "NS" (NOT_STORED), to indicate the data was not stored, but not
475         *    because of an error.
476         *
477         *    - "EX" (EXISTS), to indicate that the item you are trying to store with
478         *    CAS semantics has been modified since you last fetched it.
479         *
480         *    - "NF" (NOT_FOUND), to indicate that the item you are trying to store
481         *    with CAS semantics did not exist.
482         */
483
484        if header.starts_with("HD") {
485            Ok(())
486        } else {
487            Err(Error::new(ErrorKind::Other, header))
488        }
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use futures::executor::block_on;
495    use futures::io::{AsyncRead, AsyncWrite};
496
497    use std::io::{Cursor, Error, ErrorKind, Read, Write};
498    use std::pin::Pin;
499    use std::task::{Context, Poll};
500
501    struct Cache {
502        r: Cursor<Vec<u8>>,
503        w: Cursor<Vec<u8>>,
504    }
505
506    impl Cache {
507        fn new() -> Self {
508            Cache {
509                r: Cursor::new(Vec::new()),
510                w: Cursor::new(Vec::new()),
511            }
512        }
513    }
514
515    impl AsyncRead for Cache {
516        fn poll_read(
517            self: Pin<&mut Self>,
518            _cx: &mut Context,
519            buf: &mut [u8],
520        ) -> Poll<Result<usize, Error>> {
521            Poll::Ready(self.get_mut().r.read(buf))
522        }
523    }
524
525    impl AsyncWrite for Cache {
526        fn poll_write(
527            self: Pin<&mut Self>,
528            _cx: &mut Context,
529            buf: &[u8],
530        ) -> Poll<Result<usize, Error>> {
531            Poll::Ready(self.get_mut().w.write(buf))
532        }
533
534        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
535            Poll::Ready(self.get_mut().w.flush())
536        }
537
538        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
539            Poll::Ready(Ok(()))
540        }
541    }
542
543    #[test]
544    fn test_ascii_get() {
545        let mut cache = Cache::new();
546        cache
547            .r
548            .get_mut()
549            .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\n");
550        let mut ascii = super::Protocol::new(&mut cache);
551        assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
552        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
553    }
554
555    #[test]
556    fn test_ascii_get2() {
557        let mut cache = Cache::new();
558        cache
559            .r
560            .get_mut()
561            .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\nVALUE bar 0 3\r\nbaz\r\nEND\r\n");
562        let mut ascii = super::Protocol::new(&mut cache);
563        assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
564        assert_eq!(block_on(ascii.get(&"bar")).unwrap(), b"baz");
565    }
566
567    #[test]
568    fn test_ascii_get_cas() {
569        let mut cache = Cache::new();
570        cache
571            .r
572            .get_mut()
573            .extend_from_slice(b"VALUE foo 0 3 99999\r\nbar\r\nEND\r\n");
574        let mut ascii = super::Protocol::new(&mut cache);
575        assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
576        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
577    }
578
579    #[test]
580    fn test_ascii_get_empty() {
581        let mut cache = Cache::new();
582        cache.r.get_mut().extend_from_slice(b"END\r\n");
583        let mut ascii = super::Protocol::new(&mut cache);
584        assert_eq!(
585            block_on(ascii.get(&"foo")).unwrap_err().kind(),
586            ErrorKind::NotFound
587        );
588        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
589    }
590
591    #[test]
592    fn test_ascii_get_eof_error() {
593        let mut cache = Cache::new();
594        cache.r.get_mut().extend_from_slice(b"EN");
595        let mut ascii = super::Protocol::new(&mut cache);
596        assert_eq!(
597            block_on(ascii.get(&"foo")).unwrap_err().kind(),
598            ErrorKind::UnexpectedEof
599        );
600        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
601    }
602
603    #[test]
604    fn test_ascii_get_one() {
605        let mut cache = Cache::new();
606        cache
607            .r
608            .get_mut()
609            .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nEND\r\n");
610        let mut ascii = super::Protocol::new(&mut cache);
611        let keys = vec!["foo"];
612        let map = block_on(ascii.get_multi(&keys)).unwrap();
613        assert_eq!(map.len(), 1);
614        assert_eq!(map.get("foo").unwrap(), b"bar");
615        assert_eq!(cache.w.get_ref(), b"get foo\r\n");
616    }
617
618    #[test]
619    fn test_ascii_get_many() {
620        let mut cache = Cache::new();
621        cache
622            .r
623            .get_mut()
624            .extend_from_slice(b"VALUE foo 0 3\r\nbar\r\nVALUE baz 44 4\r\ncrux\r\nEND\r\n");
625        let mut ascii = super::Protocol::new(&mut cache);
626        let keys = vec!["foo", "baz", "blah"];
627        let map = block_on(ascii.get_multi(&keys)).unwrap();
628        assert_eq!(map.len(), 2);
629        assert_eq!(map.get("foo").unwrap(), b"bar");
630        assert_eq!(map.get("baz").unwrap(), b"crux");
631        assert_eq!(cache.w.get_ref(), b"get foo baz blah\r\n");
632    }
633
634    #[test]
635    fn test_ascii_get_prefix() {
636        let mut cache = Cache::new();
637        cache
638            .r
639            .get_mut()
640            .extend_from_slice(b"VALUE key 0 3\r\nbar\r\nVALUE kez 44 4\r\ncrux\r\nEND\r\n");
641        let mut ascii = super::Protocol::new(&mut cache);
642        let key_prefix = "ke";
643        let map = block_on(ascii.get_prefix(&key_prefix, None)).unwrap();
644        assert_eq!(map.len(), 2);
645        assert_eq!(map.get("key").unwrap(), b"bar");
646        assert_eq!(map.get("kez").unwrap(), b"crux");
647        assert_eq!(cache.w.get_ref(), b"get_prefix ke\r\n");
648    }
649
650    #[test]
651    fn test_ascii_get_multi_empty() {
652        let mut cache = Cache::new();
653        cache.r.get_mut().extend_from_slice(b"END\r\n");
654        let mut ascii = super::Protocol::new(&mut cache);
655        let keys = vec!["foo", "baz"];
656        let map = block_on(ascii.get_multi(&keys)).unwrap();
657        assert!(map.is_empty());
658        assert_eq!(cache.w.get_ref(), b"get foo baz\r\n");
659    }
660
661    #[test]
662    fn test_ascii_get_multi_zero_keys() {
663        let mut cache = Cache::new();
664        cache.r.get_mut().extend_from_slice(b"END\r\n");
665        let mut ascii = super::Protocol::new(&mut cache);
666        let map = block_on(ascii.get_multi::<&str>(&[])).unwrap();
667        assert!(map.is_empty());
668        assert_eq!(cache.w.get_ref(), b"");
669    }
670
671    #[test]
672    fn test_ascii_set() {
673        let (key, val, ttl) = ("foo", "bar", 5);
674        let mut cache = Cache::new();
675        let mut ascii = super::Protocol::new(&mut cache);
676        block_on(ascii.set(&key, val.as_bytes(), ttl)).unwrap();
677        assert_eq!(
678            cache.w.get_ref(),
679            &format!("set {} 0 {} {} noreply\r\n{}\r\n", key, ttl, val.len(), val)
680                .as_bytes()
681                .to_vec()
682        );
683    }
684
685    #[test]
686    fn test_ascii_increment_noreply() {
687        let (key, amount) = ("foo", 5);
688        let mut cache = Cache::new();
689        let mut ascii = super::Protocol::new(&mut cache);
690        block_on(ascii.increment_noreply(key.as_bytes(), amount)).unwrap();
691        assert_eq!(
692            cache.w.get_ref(),
693            &format!("incr {} {} noreply\r\n", key, amount)
694                .as_bytes()
695                .to_vec()
696        );
697    }
698
699    #[test]
700    fn test_ascii_add_new_key() {
701        let (key, val, ttl) = ("foo", "bar", 5);
702        let mut cache = Cache::new();
703        cache.r.get_mut().extend_from_slice(b"STORED\r\n");
704        let mut ascii = super::Protocol::new(&mut cache);
705        block_on(ascii.add(&key, val.as_bytes(), ttl)).unwrap();
706        assert_eq!(
707            cache.w.get_ref(),
708            &format!("add {} 0 {} {}\r\n{}\r\n", key, ttl, val.len(), val)
709                .as_bytes()
710                .to_vec()
711        );
712    }
713
714    #[test]
715    fn test_ascii_add_duplicate() {
716        let (key, val, ttl) = ("foo", "bar", 5);
717        let mut cache = Cache::new();
718        cache.r.get_mut().extend_from_slice(b"NOT_STORED\r\n");
719        let mut ascii = super::Protocol::new(&mut cache);
720        assert_eq!(
721            block_on(ascii.add(&key, val.as_bytes(), ttl))
722                .unwrap_err()
723                .kind(),
724            ErrorKind::AlreadyExists
725        );
726    }
727
728    #[test]
729    fn test_ascii_version() {
730        let mut cache = Cache::new();
731        cache.r.get_mut().extend_from_slice(b"VERSION 1.6.6\r\n");
732        let mut ascii = super::Protocol::new(&mut cache);
733        assert_eq!(block_on(ascii.version()).unwrap(), "1.6.6");
734        assert_eq!(cache.w.get_ref(), b"version\r\n");
735    }
736
737    #[test]
738    fn test_ascii_flush() {
739        let mut cache = Cache::new();
740        cache.r.get_mut().extend_from_slice(b"OK\r\n");
741        let mut ascii = super::Protocol::new(&mut cache);
742        assert!(block_on(ascii.flush()).is_ok());
743        assert_eq!(cache.w.get_ref(), b"flush_all\r\n");
744    }
745
746    #[test]
747    fn test_ascii_increment() {
748        let mut cache = Cache::new();
749        cache.r.get_mut().extend_from_slice(b"2\r\n");
750        let mut ascii = super::Protocol::new(&mut cache);
751        assert_eq!(block_on(ascii.increment("foo", 1)).unwrap(), 2);
752        assert_eq!(cache.w.get_ref(), b"incr foo 1\r\n");
753    }
754
755    #[test]
756    fn test_ascii_decrement() {
757        let mut cache = Cache::new();
758        cache.r.get_mut().extend_from_slice(b"0\r\n");
759        let mut ascii = super::Protocol::new(&mut cache);
760        assert_eq!(block_on(ascii.decrement("foo", 1)).unwrap(), 0);
761        assert_eq!(cache.w.get_ref(), b"decr foo 1\r\n");
762    }
763
764    #[test]
765    fn test_ascii_increment_not_found() {
766        let mut cache = Cache::new();
767        cache.r.get_mut().extend_from_slice(b"NOT_FOUND\r\n");
768        let mut ascii = super::Protocol::new(&mut cache);
769        let err = block_on(ascii.increment("foo", 1)).unwrap_err();
770        assert_eq!(err.kind(), ErrorKind::NotFound);
771    }
772
773    #[test]
774    fn test_ascii_increment_client_error() {
775        let mut cache = Cache::new();
776        cache
777            .r
778            .get_mut()
779            .extend_from_slice(b"CLIENT_ERROR cannot increment or decrement non-numeric value\r\n");
780        let mut ascii = super::Protocol::new(&mut cache);
781        let err = block_on(ascii.increment("foo", 1)).unwrap_err();
782        assert_eq!(err.kind(), ErrorKind::Other);
783        assert!(err
784            .to_string()
785            .contains("cannot increment or decrement non-numeric value"));
786    }
787
788    #[test]
789    fn test_ascii_increment_server_error() {
790        let mut cache = Cache::new();
791        cache
792            .r
793            .get_mut()
794            .extend_from_slice(b"SERVER_ERROR out of memory\r\n");
795        let mut ascii = super::Protocol::new(&mut cache);
796        let err = block_on(ascii.increment("foo", 1)).unwrap_err();
797        assert_eq!(err.kind(), ErrorKind::Other);
798        assert!(err.to_string().contains("out of memory"));
799    }
800
801    #[test]
802    fn test_ascii_increment_error() {
803        let mut cache = Cache::new();
804        cache.r.get_mut().extend_from_slice(b"ERROR\r\n");
805        let mut ascii = super::Protocol::new(&mut cache);
806        let err = block_on(ascii.increment("foo", 1)).unwrap_err();
807        assert_eq!(err.kind(), ErrorKind::Other);
808    }
809
810    #[test]
811    fn test_ascii_decrement_not_found() {
812        let mut cache = Cache::new();
813        cache.r.get_mut().extend_from_slice(b"NOT_FOUND\r\n");
814        let mut ascii = super::Protocol::new(&mut cache);
815        let err = block_on(ascii.decrement("foo", 1)).unwrap_err();
816        assert_eq!(err.kind(), ErrorKind::NotFound);
817    }
818
819    #[test]
820    fn test_ascii_decrement_client_error() {
821        let mut cache = Cache::new();
822        cache
823            .r
824            .get_mut()
825            .extend_from_slice(b"CLIENT_ERROR cannot increment or decrement non-numeric value\r\n");
826        let mut ascii = super::Protocol::new(&mut cache);
827        let err = block_on(ascii.decrement("foo", 1)).unwrap_err();
828        assert_eq!(err.kind(), ErrorKind::Other);
829        assert!(err
830            .to_string()
831            .contains("cannot increment or decrement non-numeric value"));
832    }
833
834    #[test]
835    fn test_ascii_gets_cas() {
836        let mut cache = Cache::new();
837        cache
838            .r
839            .get_mut()
840            .extend_from_slice(b"VALUE foo 0 3 77\r\nbar\r\nEND\r\n");
841        let mut ascii = super::Protocol::new(&mut cache);
842        assert_eq!(
843            block_on(ascii.gets_cas(&"foo")).unwrap(),
844            (Vec::from(b"bar"), 77)
845        );
846        assert_eq!(cache.w.get_ref(), b"gets foo\r\n");
847    }
848
849    #[test]
850    fn test_ascii_cas() {
851        let (key, val, cas_id, ttl) = ("foo", "bar", 33, 5);
852        let mut cache = Cache::new();
853        cache
854            .r
855            .get_mut()
856            .extend_from_slice(b"STORED\r\nbar\r\nEND\r\n");
857        let mut ascii = super::Protocol::new(&mut cache);
858        block_on(ascii.cas(&key, val.as_bytes(), cas_id, ttl)).unwrap();
859        assert_eq!(
860            cache.w.get_ref(),
861            &format!(
862                "cas {} 0 {} {} {}\r\n{}\r\n",
863                key,
864                ttl,
865                val.len(),
866                cas_id,
867                val
868            )
869            .as_bytes()
870            .to_vec()
871        );
872    }
873
874    #[test]
875    fn test_ascii_append_or_vivify() {
876        let (key, val, ttl) = ("foo", "bar", 5);
877        let mut cache = Cache::new();
878        cache.r.get_mut().extend_from_slice(b"HD\r\nbar\r\nEND\r\n");
879        let mut ascii = super::Protocol::new(&mut cache);
880        block_on(ascii.append_or_vivify(&key, val.as_bytes(), ttl)).unwrap();
881        assert_eq!(
882            cache.w.get_ref(),
883            &format!("ms {} {} MA N{}\r\n{}\r\n", key, val.len(), ttl, val)
884                .as_bytes()
885                .to_vec()
886        );
887    }
888
889    #[test]
890    fn test_response_starting_with_error() {
891        let mut cache = Cache::new();
892        cache.r.get_mut().extend_from_slice(b"ERROR\r\n");
893        let mut ascii = super::Protocol::new(&mut cache);
894        let err = block_on(ascii.get(&"foo")).unwrap_err();
895        assert_eq!(err.kind(), ErrorKind::Other);
896    }
897
898    #[test]
899    fn test_response_containing_error_string() {
900        let mut cache = Cache::new();
901        cache
902            .r
903            .get_mut()
904            .extend_from_slice(b"VALUE contains_ERROR 0 3\r\nbar\r\nEND\r\n");
905        let mut ascii = super::Protocol::new(&mut cache);
906        assert_eq!(block_on(ascii.get(&"foo")).unwrap(), b"bar");
907    }
908
909    #[test]
910    fn test_client_error_response() {
911        let mut cache = Cache::new();
912        cache
913            .r
914            .get_mut()
915            .extend_from_slice(b"CLIENT_ERROR bad command\r\n");
916        let mut ascii = super::Protocol::new(&mut cache);
917        let err = block_on(ascii.get(&"foo")).unwrap_err();
918        assert_eq!(err.kind(), ErrorKind::Other);
919    }
920
921    #[test]
922    fn test_server_error_response() {
923        let mut cache = Cache::new();
924        cache
925            .r
926            .get_mut()
927            .extend_from_slice(b"SERVER_ERROR out of memory\r\n");
928        let mut ascii = super::Protocol::new(&mut cache);
929        let err = block_on(ascii.get(&"foo")).unwrap_err();
930        assert_eq!(err.kind(), ErrorKind::Other);
931    }
932}