async_memcached/proto/
ascii_protocol.rs

1use crate::{AsMemcachedValue, ErrorKind};
2use crate::{Client, Error, Response, Status, Value};
3
4use fxhash::FxHashMap;
5use std::future::Future;
6use tokio::io::AsyncWriteExt;
7
8const MAX_KEY_LENGTH: usize = 250; // reference in memcached documentation: https://github.com/memcached/memcached/blob/5609673ed29db98a377749fab469fe80777de8fd/doc/protocol.txt#L46
9
10/// Trait defining ASCII protocol-specific methods for the Client.
11pub trait AsciiProtocol {
12    /// Gets the given key.
13    ///
14    /// If the key is found, `Some(Value)` is returned, describing the metadata and data of the key.
15    ///
16    /// Otherwise, [`Error`] is returned.
17    fn get<K: AsRef<[u8]>>(&mut self, key: K)
18        -> impl Future<Output = Result<Option<Value>, Error>>;
19
20    /// Gets multiple keys.
21    ///
22    /// If any of the keys are found, a vector of [`Value`] will be returned.
23    ///
24    /// Otherwise, [`Error`] is returned.
25    fn get_multi<I, K>(&mut self, keys: I) -> impl Future<Output = Result<Vec<Value>, Error>>
26    where
27        I: IntoIterator<Item = K>,
28        K: AsRef<[u8]>;
29
30    /// Gets the given keys.
31    ///
32    /// Deprecated: This is now an alias for `get_multi`, and  will be removed in the future.
33    #[deprecated(
34        since = "0.4.0",
35        note = "This is now an alias for `get_multi`, and will be removed in the future."
36    )]
37    fn get_many<I, K>(&mut self, keys: I) -> impl Future<Output = Result<Vec<Value>, Error>>
38    where
39        I: IntoIterator<Item = K>,
40        K: AsRef<[u8]>;
41
42    /// Sets the given key.
43    ///
44    /// If `ttl` or `flags` are not specified, they will default to 0. If the value is set
45    /// successfully, `()` is returned, otherwise [`Error`] is returned.
46    fn set<K, V>(
47        &mut self,
48        key: K,
49        value: V,
50        ttl: Option<i64>,
51        flags: Option<u32>,
52    ) -> impl Future<Output = Result<(), Error>>
53    where
54        K: AsRef<[u8]>,
55        V: AsMemcachedValue;
56
57    /// Sets multiple keys and values through pipelined commands.
58    ///
59    /// If `ttl` or `flags` are not specified, they will default to 0. The same values for `ttl` and `flags` will be applied to each key.
60    /// Returns a result with a HashMap of keys mapped to the result of the set operation, or an error.
61    fn set_multi<'a, K, V>(
62        &mut self,
63        kv: &'a [(K, V)],
64        ttl: Option<i64>,
65        flags: Option<u32>,
66    ) -> impl Future<Output = Result<FxHashMap<&'a K, Result<(), Error>>, Error>>
67    where
68        K: AsRef<[u8]> + Eq + std::hash::Hash + std::fmt::Debug,
69        V: AsMemcachedValue;
70
71    /// Add a key. If the value exists, Err(Protocol(NotStored)) is returned.
72    fn add<K, V>(
73        &mut self,
74        key: K,
75        value: V,
76        ttl: Option<i64>,
77        flags: Option<u32>,
78    ) -> impl Future<Output = Result<(), Error>>
79    where
80        K: AsRef<[u8]>,
81        V: AsMemcachedValue;
82
83    /// Attempts to add multiple keys and values through pipelined commands.
84    ///
85    /// If `ttl` or `flags` are not specified, they will default to 0. The same values for `ttl` and `flags` will be applied to each key.
86    /// Returns a result with a HashMap of keys mapped to the result of the add operation, or an error.
87    fn add_multi<'a, K, V>(
88        &mut self,
89        kv: &'a [(K, V)],
90        ttl: Option<i64>,
91        flags: Option<u32>,
92    ) -> impl Future<Output = Result<FxHashMap<&'a K, Result<(), Error>>, Error>>
93    where
94        K: AsRef<[u8]> + Eq + std::hash::Hash + std::fmt::Debug,
95        V: AsMemcachedValue;
96
97    /// Delete multiple keys
98    fn delete_multi_no_reply<K>(&mut self, keys: &[K]) -> impl Future<Output = Result<(), Error>>
99    where
100        K: AsRef<[u8]>;
101
102    /// Delete a key but don't wait for a reply.
103    fn delete_no_reply<K>(&mut self, key: K) -> impl Future<Output = Result<(), Error>>
104    where
105        K: AsRef<[u8]>;
106
107    /// Delete a key and wait for a reply.
108    fn delete<K>(&mut self, key: K) -> impl Future<Output = Result<(), Error>>
109    where
110        K: AsRef<[u8]>;
111
112    /// Increments the given key by the specified amount.
113    /// Can overflow from the max value of u64 (18446744073709551615) -> 0.
114    /// If the key does not exist, the server will return a KeyNotFound error.
115    /// If the key exists but the value is non-numeric, the server will return a ClientError.
116    fn increment<K>(&mut self, key: K, amount: u64) -> impl Future<Output = Result<u64, Error>>
117    where
118        K: AsRef<[u8]>;
119
120    /// Increments the given key by the specified amount with no reply from the server.
121    /// Can overflow from the max value of u64 (18446744073709551615) -> 0.
122    /// Always returns () for a complete request, will not return any indication of success or failure.
123    fn increment_no_reply<K>(
124        &mut self,
125        key: K,
126        amount: u64,
127    ) -> impl Future<Output = Result<(), Error>>
128    where
129        K: AsRef<[u8]>;
130
131    /// Decrements the given key by the specified amount.
132    /// Will not decrement the counter below 0.
133    /// If the key does not exist, the server will return a KeyNotFound error.
134    /// If the key exists but the value is non-numeric, the server will return a ClientError.
135    fn decrement<K>(&mut self, key: K, amount: u64) -> impl Future<Output = Result<u64, Error>>
136    where
137        K: AsRef<[u8]>;
138
139    /// Decrements the given key by the specified amount with no reply from the server.
140    /// Will not decrement the counter below 0.
141    /// Always returns () for a complete request, will not return any indication of success or failure.
142    fn decrement_no_reply<K>(
143        &mut self,
144        key: K,
145        amount: u64,
146    ) -> impl Future<Output = Result<(), Error>>
147    where
148        K: AsRef<[u8]>;
149}
150
151impl AsciiProtocol for Client {
152    async fn get<K: AsRef<[u8]>>(&mut self, key: K) -> Result<Option<Value>, Error> {
153        let kr = Self::validate_key_length(key.as_ref())?;
154
155        self.conn
156            .write_all(&[b"get ", kr, b"\r\n"].concat())
157            .await?;
158        self.conn.flush().await?;
159
160        match self.get_read_write_response().await? {
161            Response::Status(Status::NotFound) => Ok(None),
162            Response::Status(s) => Err(s.into()),
163            Response::Data(d) => d
164                .map(|mut items| {
165                    if items.len() != 1 {
166                        Err(Status::Error(ErrorKind::Protocol(None)).into())
167                    } else {
168                        Ok(items.remove(0))
169                    }
170                })
171                .transpose(),
172            _ => Err(Error::Protocol(Status::Error(ErrorKind::Protocol(None)))),
173        }
174    }
175
176    async fn get_multi<I, K>(&mut self, keys: I) -> Result<Vec<Value>, Error>
177    where
178        I: IntoIterator<Item = K>,
179        K: AsRef<[u8]>,
180    {
181        self.conn.write_all(b"get").await?;
182        for key in keys {
183            if key.as_ref().len() > MAX_KEY_LENGTH {
184                continue;
185            }
186            self.conn.write_all(b" ").await?;
187            self.conn.write_all(key.as_ref()).await?;
188        }
189        self.conn.write_all(b"\r\n").await?;
190        self.conn.flush().await?;
191
192        match self.get_read_write_response().await? {
193            Response::Status(s) => Err(s.into()),
194            Response::Data(d) => d.ok_or(Status::NotFound.into()),
195            _ => Err(Status::Error(ErrorKind::Protocol(None)).into()),
196        }
197    }
198
199    async fn get_many<I, K>(&mut self, keys: I) -> Result<Vec<Value>, Error>
200    where
201        I: IntoIterator<Item = K>,
202        K: AsRef<[u8]>,
203    {
204        self.get_multi(keys).await
205    }
206
207    async fn set<K, V>(
208        &mut self,
209        key: K,
210        value: V,
211        ttl: Option<i64>,
212        flags: Option<u32>,
213    ) -> Result<(), Error>
214    where
215        K: AsRef<[u8]>,
216        V: AsMemcachedValue,
217    {
218        let kr = Self::validate_key_length(key.as_ref())?;
219        let vr = value.as_bytes();
220
221        self.conn.write_all(b"set ").await?;
222        self.conn.write_all(kr).await?;
223
224        let flags = flags.unwrap_or(0).to_string();
225        self.conn.write_all(b" ").await?;
226        self.conn.write_all(flags.as_ref()).await?;
227
228        let ttl = ttl.unwrap_or(0).to_string();
229        self.conn.write_all(b" ").await?;
230        self.conn.write_all(ttl.as_ref()).await?;
231
232        let vlen = vr.len().to_string();
233        self.conn.write_all(b" ").await?;
234        self.conn.write_all(vlen.as_ref()).await?;
235        self.conn.write_all(b"\r\n").await?;
236
237        self.conn.write_all(vr.as_ref()).await?;
238        self.conn.write_all(b"\r\n").await?;
239
240        self.conn.flush().await?;
241
242        match self.get_read_write_response().await? {
243            Response::Status(Status::Stored) => Ok(()),
244            Response::Status(s) => Err(s.into()),
245            _ => Err(Status::Error(ErrorKind::Protocol(None)).into()),
246        }
247    }
248
249    async fn set_multi<'a, K, V>(
250        &mut self,
251        kv: &'a [(K, V)],
252        ttl: Option<i64>,
253        flags: Option<u32>,
254    ) -> Result<FxHashMap<&'a K, Result<(), Error>>, Error>
255    where
256        K: AsRef<[u8]> + Eq + std::hash::Hash + std::fmt::Debug,
257        V: AsMemcachedValue,
258    {
259        for (key, value) in kv {
260            let kr = key.as_ref();
261            if kr.len() > MAX_KEY_LENGTH {
262                continue;
263            }
264
265            let vr = value.as_bytes();
266
267            self.conn.write_all(b"set ").await?;
268            self.conn.write_all(kr).await?;
269
270            let flags = flags.unwrap_or(0).to_string();
271            self.conn.write_all(b" ").await?;
272            self.conn.write_all(flags.as_ref()).await?;
273
274            let ttl = ttl.unwrap_or(0).to_string();
275            self.conn.write_all(b" ").await?;
276            self.conn.write_all(ttl.as_ref()).await?;
277
278            let vlen = vr.len().to_string();
279            self.conn.write_all(b" ").await?;
280            self.conn.write_all(vlen.as_ref()).await?;
281            self.conn.write_all(b"\r\n").await?;
282
283            self.conn.write_all(vr.as_ref()).await?;
284            self.conn.write_all(b"\r\n").await?;
285        }
286        self.conn.flush().await?;
287
288        let results = self.map_set_multi_responses(kv).await?;
289
290        Ok(results)
291    }
292
293    async fn add<K, V>(
294        &mut self,
295        key: K,
296        value: V,
297        ttl: Option<i64>,
298        flags: Option<u32>,
299    ) -> Result<(), Error>
300    where
301        K: AsRef<[u8]>,
302        V: AsMemcachedValue,
303    {
304        let kr = Self::validate_key_length(key.as_ref())?;
305        let vr = value.as_bytes();
306
307        self.conn.write_all(b"add ").await?;
308        self.conn.write_all(kr).await?;
309
310        let flags = flags.unwrap_or(0).to_string();
311        self.conn.write_all(b" ").await?;
312        self.conn.write_all(flags.as_ref()).await?;
313
314        let ttl = ttl.unwrap_or(0).to_string();
315        self.conn.write_all(b" ").await?;
316        self.conn.write_all(ttl.as_ref()).await?;
317
318        let vlen = vr.len().to_string();
319        self.conn.write_all(b" ").await?;
320        self.conn.write_all(vlen.as_ref()).await?;
321        self.conn.write_all(b"\r\n").await?;
322
323        self.conn.write_all(vr.as_ref()).await?;
324        self.conn.write_all(b"\r\n").await?;
325
326        self.conn.flush().await?;
327
328        match self.get_read_write_response().await? {
329            Response::Status(Status::Stored) => Ok(()),
330            Response::Status(s) => Err(s.into()),
331            _ => Err(Status::Error(ErrorKind::Protocol(None)).into()),
332        }
333    }
334
335    async fn add_multi<'a, K, V>(
336        &mut self,
337        kv: &'a [(K, V)],
338        ttl: Option<i64>,
339        flags: Option<u32>,
340    ) -> Result<FxHashMap<&'a K, Result<(), Error>>, Error>
341    where
342        K: AsRef<[u8]> + Eq + std::hash::Hash + std::fmt::Debug,
343        V: AsMemcachedValue,
344    {
345        for (key, value) in kv {
346            let kr = key.as_ref();
347            if kr.len() > MAX_KEY_LENGTH {
348                continue;
349            }
350
351            let vr = value.as_bytes();
352
353            self.conn.write_all(b"add ").await?;
354            self.conn.write_all(kr).await?;
355
356            let flags = flags.unwrap_or(0).to_string();
357            self.conn.write_all(b" ").await?;
358            self.conn.write_all(flags.as_ref()).await?;
359
360            let ttl = ttl.unwrap_or(0).to_string();
361            self.conn.write_all(b" ").await?;
362            self.conn.write_all(ttl.as_ref()).await?;
363
364            let vlen = vr.len().to_string();
365            self.conn.write_all(b" ").await?;
366            self.conn.write_all(vlen.as_ref()).await?;
367            self.conn.write_all(b"\r\n").await?;
368
369            self.conn.write_all(vr.as_ref()).await?;
370            self.conn.write_all(b"\r\n").await?;
371        }
372        self.conn.flush().await?;
373
374        let results = self.map_set_multi_responses(kv).await?;
375
376        Ok(results)
377    }
378
379    /// Delete a key but don't wait for a reply.
380    async fn delete_no_reply<K>(&mut self, key: K) -> Result<(), Error>
381    where
382        K: AsRef<[u8]>,
383    {
384        let kr = Self::validate_key_length(key.as_ref())?;
385
386        self.conn
387            .write_all(&[b"delete ", kr, b" noreply\r\n"].concat())
388            .await?;
389        self.conn.flush().await?;
390        Ok(())
391    }
392
393    /// Delete a key and wait for a reply
394    async fn delete<K>(&mut self, key: K) -> Result<(), Error>
395    where
396        K: AsRef<[u8]>,
397    {
398        let kr = Self::validate_key_length(key.as_ref())?;
399
400        self.conn
401            .write_all(&[b"delete ", kr, b"\r\n"].concat())
402            .await?;
403        self.conn.flush().await?;
404
405        match self.get_read_write_response().await? {
406            Response::Status(Status::Deleted) => Ok(()),
407            Response::Status(s) => Err(s.into()),
408            _ => Err(Status::Error(ErrorKind::Protocol(None)).into()),
409        }
410    }
411
412    async fn delete_multi_no_reply<K>(&mut self, keys: &[K]) -> Result<(), Error>
413    where
414        K: AsRef<[u8]>,
415    {
416        for key in keys {
417            let kr = key.as_ref();
418            if kr.len() > MAX_KEY_LENGTH {
419                continue;
420            }
421
422            self.conn.write_all(b"delete ").await?;
423            self.conn.write_all(kr).await?;
424            self.conn.write_all(b" noreply\r\n").await?;
425        }
426        self.conn.flush().await?;
427
428        Ok(())
429    }
430
431    async fn increment<K>(&mut self, key: K, amount: u64) -> Result<u64, Error>
432    where
433        K: AsRef<[u8]>,
434    {
435        let kr = Self::validate_key_length(key.as_ref())?;
436
437        self.conn
438            .write_all(&[b"incr ", kr, b" ", amount.to_string().as_bytes(), b"\r\n"].concat())
439            .await?;
440        self.conn.flush().await?;
441
442        match self.get_read_write_response().await? {
443            Response::Status(s) => Err(s.into()),
444            Response::IncrDecr(amount) => Ok(amount),
445            _ => Err(Status::Error(ErrorKind::Protocol(None)).into()),
446        }
447    }
448
449    async fn increment_no_reply<K>(&mut self, key: K, amount: u64) -> Result<(), Error>
450    where
451        K: AsRef<[u8]>,
452    {
453        let kr = Self::validate_key_length(key.as_ref())?;
454
455        self.conn
456            .write_all(
457                &[
458                    b"incr ",
459                    kr,
460                    b" ",
461                    amount.to_string().as_bytes(),
462                    b" noreply\r\n",
463                ]
464                .concat(),
465            )
466            .await?;
467        self.conn.flush().await?;
468
469        Ok(())
470    }
471
472    async fn decrement<K>(&mut self, key: K, amount: u64) -> Result<u64, Error>
473    where
474        K: AsRef<[u8]>,
475    {
476        let kr = Self::validate_key_length(key.as_ref())?;
477
478        self.conn
479            .write_all(&[b"decr ", kr, b" ", amount.to_string().as_bytes(), b"\r\n"].concat())
480            .await?;
481        self.conn.flush().await?;
482
483        match self.get_read_write_response().await? {
484            Response::Status(s) => Err(s.into()),
485            Response::IncrDecr(amount) => Ok(amount),
486            _ => Err(Status::Error(ErrorKind::Protocol(None)).into()),
487        }
488    }
489
490    async fn decrement_no_reply<K>(&mut self, key: K, amount: u64) -> Result<(), Error>
491    where
492        K: AsRef<[u8]>,
493    {
494        let kr = Self::validate_key_length(key.as_ref())?;
495
496        self.conn
497            .write_all(
498                &[
499                    b"decr ",
500                    kr,
501                    b" ",
502                    amount.to_string().as_bytes(),
503                    b" noreply\r\n",
504                ]
505                .concat(),
506            )
507            .await?;
508        self.conn.flush().await?;
509
510        Ok(())
511    }
512}