async_memcached/proto/
meta_protocol.rs

1use crate::{AsMemcachedValue, Client, Error, Status};
2
3use crate::parser::{
4    parse_meta_arithmetic_response, parse_meta_delete_response, parse_meta_get_response,
5    parse_meta_set_response,
6};
7use crate::parser::{MetaResponse, MetaValue};
8
9use std::future::Future;
10
11use tokio::io::AsyncWriteExt;
12
13/// Trait defining Meta protocol-specific methods for the Client.
14pub trait MetaProtocol {
15    /// Gets the given key with additional metadata.
16    ///
17    /// If the key is found, `Some(MetaValue)` is returned, describing the metadata and data of the key.
18    ///
19    /// Otherwise, `None` is returned.
20    //
21    // Command format:
22    // mg <key> <meta_flags>*\r\n
23    //
24    // - <key> is the key string, with a maximum length of 250 bytes.
25    //
26    // - <meta_flags> is an optional slice of string references for meta flags.
27    // Meta flags may have associated tokens after the initial character, e.g. "O123" for opaque.
28    // Using the "q" flag for quiet mode will append a no-op command to the request ("mn\r\n") so that the client
29    // can proceed properly in the event of a cache miss.
30    fn meta_get<K: AsRef<[u8]>>(
31        &mut self,
32        key: K,
33        is_quiet: bool,
34        opaque: Option<&[u8]>,
35        meta_flags: Option<&[&str]>,
36    ) -> impl Future<Output = Result<Option<MetaValue>, Error>>;
37
38    /// Sets the given key with additional metadata.
39    ///
40    /// If the value is set successfully, `Some(MetaValue)` is returned, otherwise [`Error`] is returned.
41    /// NOTE: That the data in this MetaValue is sparsely populated, containing only requested data by meta_flags
42    /// The meta set command is a generic command for storing data to memcached. Based on the flags supplied,
43    /// it can replace all storage commands (see token M) as well as adds new options.
44    //
45    // Command format:
46    // ms <key> <datalen> <meta_flags>*\r\n<data_block>\r\n
47    //
48    // - <key> is the key string, with a maximum length of 250 bytes.
49    // - <datalen> is the length of the payload data.
50    //
51    // - <meta_flags> is an optional slice of string references for meta flags.
52    // Meta flags may have associated tokens after the initial character, e.g. "O123" for opaque.
53    //
54    // - <data_block> is the payload data to be stored, with a maximum size of ~1MB.
55    fn meta_set<K, V>(
56        &mut self,
57        key: K,
58        value: V,
59        is_quiet: bool,
60        opaque: Option<&[u8]>,
61        meta_flags: Option<&[&str]>,
62    ) -> impl Future<Output = Result<Option<MetaValue>, Error>>
63    where
64        K: AsRef<[u8]>,
65        V: AsMemcachedValue;
66
67    /// Deletes the given key with additional metadata.
68    ///
69    /// If the key is found, it will be deleted, invalidated or tombstoned depending on the meta flags provided.
70    /// If data is requested back via meta flags then a `MetaValue` is returned, otherwise `None`.
71    //
72    // Command format:
73    // md <key> <meta_flags>*\r\n
74    //
75    // - <key> is the key string, with a maximum length of 250 bytes.
76    //
77    // - <meta_flags> is an optional slice of string references for meta flags.
78    // Meta flags may have associated tokens after the initial character, e.g. "O123" for opaque.
79    fn meta_delete<K: AsRef<[u8]>>(
80        &mut self,
81        key: K,
82        is_quiet: bool,
83        opaque: Option<&[u8]>,
84        meta_flags: Option<&[&str]>,
85    ) -> impl Future<Output = Result<Option<MetaValue>, Error>>;
86
87    /// Performs an increment (arithmetic) operation on the given key.
88    ///
89    /// If the key is found, the increment operation is performed.
90    /// If data is requested back via meta flags then a `MetaValue` is returned, otherwise `None`.
91    ///
92    /// Command format:
93    ///   ma <key> <meta_flags>*\r\n
94    ///
95    /// - <key> is the key string, with a maximum length of 250 bytes.
96    ///
97    /// - <opaque> is an optional slice of string references with a maximum length of 32 bytes.
98    ///
99    /// - <delta> is an optional u64 value for the decrement delta.
100    ///   The default behaviour is to decrement with a delta of 1.
101    ///
102    /// - <is_quiet> is a boolean value indicating whether to use quiet mode.
103    ///   quiet mode will append a no-op command to the request ("mn\r\n") so that the client
104    ///   can proceed properly in the event of a cache miss.
105    ///
106    /// - <meta_flags> is an optional slice of string references for additional meta flags.
107    ///   Meta flags may have associated tokens after the initial character, e.g "N123"
108    ///   Do not include "M", "D", "O" or "q" flags as additional meta flags, they will be ignored.
109    ///   Instead, use the specified parameters.
110    fn meta_increment<K: AsRef<[u8]>>(
111        &mut self,
112        key: K,
113        is_quiet: bool,
114        opaque: Option<&[u8]>,
115        delta: Option<u64>,
116        meta_flags: Option<&[&str]>,
117    ) -> impl Future<Output = Result<Option<MetaValue>, Error>>;
118
119    /// Performs a decrement (arithmetic) operation on the given key.
120    ///
121    /// If the key is found, the decrement operation is performed.
122    /// If data is requested back via meta flags then a `MetaValue` is returned, otherwise `None`.
123    ///
124    /// Command format:
125    ///   ma <key> MD <meta_flags>*\r\n
126    ///
127    /// - <key> is the key string, with a maximum length of 250 bytes.
128    ///
129    /// - <opaque> is an optional slice of string references with a maximum length of 32 bytes.
130    ///
131    /// - <delta> is an optional u64 value for the decrement delta.
132    ///   The default behaviour is to decrement with a delta of 1.
133    ///
134    /// - <is_quiet> is a boolean value indicating whether to use quiet mode.
135    ///   quiet mode will append a no-op command to the request ("mn\r\n") so that the client
136    ///   can proceed properly in the event of a cache miss.
137    ///
138    /// - <meta_flags> is an optional slice of string references for additional meta flags.
139    ///   Meta flags may have associated tokens after the initial character, e.g "N123"
140    ///   Do not include "M", "D", "O" or "q" flags as additional meta flags, they will be ignored.
141    ///   Instead, use the specified parameters.
142    fn meta_decrement<K: AsRef<[u8]>>(
143        &mut self,
144        key: K,
145        is_quiet: bool,
146        opaque: Option<&[u8]>,
147        delta: Option<u64>,
148        meta_flags: Option<&[&str]>,
149    ) -> impl Future<Output = Result<Option<MetaValue>, Error>>;
150}
151
152impl MetaProtocol for Client {
153    async fn meta_get<K: AsRef<[u8]>>(
154        &mut self,
155        key: K,
156        is_quiet: bool,
157        opaque: Option<&[u8]>,
158        meta_flags: Option<&[&str]>,
159    ) -> Result<Option<MetaValue>, Error> {
160        let kr = Self::validate_key_length(key.as_ref())?;
161
162        if let Some(opaque) = &opaque {
163            Self::validate_opaque_length(opaque)?;
164        }
165
166        self.conn.write_all(b"mg ").await?;
167        self.conn.write_all(kr).await?;
168
169        Self::check_and_write_opaque(self, opaque).await?;
170
171        Self::check_and_write_meta_flags(self, meta_flags, opaque).await?;
172
173        Self::check_and_write_quiet_mode(self, is_quiet).await?;
174
175        self.conn.flush().await?;
176
177        match self.drive_receive(parse_meta_get_response).await? {
178            MetaResponse::Status(Status::NotFound) => Ok(None),
179            MetaResponse::Status(Status::NoOp) => Ok(None),
180            MetaResponse::Status(s) => Err(s.into()),
181            MetaResponse::Data(d) => d
182                .map(|mut items| {
183                    let item = items.remove(0);
184                    Ok(item)
185                })
186                .transpose(),
187        }
188    }
189
190    async fn meta_set<K, V>(
191        &mut self,
192        key: K,
193        value: V,
194        is_quiet: bool,
195        opaque: Option<&[u8]>,
196        meta_flags: Option<&[&str]>,
197    ) -> Result<Option<MetaValue>, Error>
198    where
199        K: AsRef<[u8]>,
200        V: AsMemcachedValue,
201    {
202        let kr = Self::validate_key_length(key.as_ref())?;
203
204        if let Some(opaque) = &opaque {
205            Self::validate_opaque_length(opaque)?;
206        }
207
208        let vr = value.as_bytes();
209
210        self.conn.write_all(b"ms ").await?;
211        self.conn.write_all(kr).await?;
212
213        let vlen = vr.len().to_string();
214        self.conn.write_all(b" ").await?;
215        self.conn.write_all(vlen.as_ref()).await?;
216
217        Self::check_and_write_opaque(self, opaque).await?;
218
219        Self::check_and_write_meta_flags(self, meta_flags, opaque).await?;
220
221        if is_quiet {
222            self.conn.write_all(b" q").await?;
223        }
224
225        self.conn.write_all(b"\r\n").await?;
226        self.conn.write_all(vr.as_ref()).await?;
227        self.conn.write_all(b"\r\n").await?;
228
229        if is_quiet {
230            self.conn.write_all(b"mn\r\n").await?;
231        }
232
233        self.conn.flush().await?;
234
235        match self.drive_receive(parse_meta_set_response).await? {
236            MetaResponse::Status(Status::Stored) => Ok(None),
237            MetaResponse::Status(Status::NoOp) => Ok(None),
238            MetaResponse::Status(s) => Err(s.into()),
239            MetaResponse::Data(d) => d
240                .map(|mut items| {
241                    let item = items.remove(0);
242                    Ok(item)
243                })
244                .transpose(),
245        }
246    }
247
248    async fn meta_delete<K: AsRef<[u8]>>(
249        &mut self,
250        key: K,
251        is_quiet: bool,
252        opaque: Option<&[u8]>,
253        meta_flags: Option<&[&str]>,
254    ) -> Result<Option<MetaValue>, Error> {
255        let kr = Self::validate_key_length(key.as_ref())?;
256
257        if let Some(opaque) = &opaque {
258            Self::validate_opaque_length(opaque)?;
259        }
260
261        self.conn.write_all(b"md ").await?;
262        self.conn.write_all(kr).await?;
263
264        Self::check_and_write_opaque(self, opaque).await?;
265
266        Self::check_and_write_meta_flags(self, meta_flags, opaque).await?;
267
268        Self::check_and_write_quiet_mode(self, is_quiet).await?;
269
270        self.conn.flush().await?;
271
272        match self.drive_receive(parse_meta_delete_response).await? {
273            MetaResponse::Status(Status::Deleted) => Ok(None),
274            MetaResponse::Status(Status::Exists) => Err(Error::Protocol(Status::Exists)),
275            MetaResponse::Status(Status::NoOp) => Ok(None),
276            MetaResponse::Status(s) => Err(s.into()),
277            MetaResponse::Data(d) => d
278                .map(|mut items| {
279                    let item = items.remove(0);
280                    Ok(item)
281                })
282                .transpose(),
283        }
284    }
285
286    async fn meta_increment<K: AsRef<[u8]>>(
287        &mut self,
288        key: K,
289        is_quiet: bool,
290        opaque: Option<&[u8]>,
291        delta: Option<u64>,
292        meta_flags: Option<&[&str]>,
293    ) -> Result<Option<MetaValue>, Error> {
294        let kr = Self::validate_key_length(key.as_ref())?;
295
296        if let Some(opaque) = &opaque {
297            Self::validate_opaque_length(opaque)?;
298        }
299
300        self.conn.write_all(b"ma ").await?;
301        self.conn.write_all(kr).await?;
302
303        Self::check_and_write_opaque(self, opaque).await?;
304
305        // skip writing "MI" because it's default behaviour and we can save the bytes.
306        if let Some(delta) = delta {
307            if delta != 1 {
308                self.conn.write_all(b" D").await?;
309                self.conn.write_all(delta.to_string().as_bytes()).await?;
310            }
311        }
312
313        if let Some(meta_flags) = meta_flags {
314            for flag in meta_flags {
315                // ignore M flag because it's specific to the method called, ignore q and require param to be used
316                // prefer explicit D and O params over meta flags
317                if flag.starts_with('M')
318                    || flag.starts_with('q')
319                    || (flag.starts_with('D') && delta.is_some())
320                    || (flag.starts_with('O') && opaque.is_some())
321                {
322                    continue;
323                } else {
324                    self.conn.write_all(b" ").await?;
325                    self.conn.write_all(flag.as_bytes()).await?;
326                }
327            }
328        }
329
330        Self::check_and_write_quiet_mode(self, is_quiet).await?;
331
332        self.conn.flush().await?;
333
334        match self.drive_receive(parse_meta_arithmetic_response).await? {
335            MetaResponse::Status(Status::Stored) => Ok(None),
336            MetaResponse::Status(Status::NoOp) => Ok(None),
337            MetaResponse::Status(s) => Err(s.into()),
338            MetaResponse::Data(d) => d
339                .map(|mut items| {
340                    let item = items.remove(0);
341                    Ok(item)
342                })
343                .transpose(),
344        }
345    }
346
347    async fn meta_decrement<K: AsRef<[u8]>>(
348        &mut self,
349        key: K,
350        is_quiet: bool,
351        opaque: Option<&[u8]>,
352        delta: Option<u64>,
353        meta_flags: Option<&[&str]>,
354    ) -> Result<Option<MetaValue>, Error> {
355        let kr = Self::validate_key_length(key.as_ref())?;
356
357        if let Some(opaque) = &opaque {
358            Self::validate_opaque_length(opaque)?;
359        }
360
361        self.conn.write_all(b"ma ").await?;
362        self.conn.write_all(kr).await?;
363        self.conn.write_all(b" MD").await?;
364
365        Self::check_and_write_opaque(self, opaque).await?;
366
367        if let Some(delta) = delta {
368            if delta != 1 {
369                self.conn.write_all(b" D").await?;
370                self.conn.write_all(delta.to_string().as_bytes()).await?;
371            }
372        }
373
374        if let Some(meta_flags) = meta_flags {
375            for flag in meta_flags {
376                // ignore M flag because it's specific to the method called, ignore q and require param to be used
377                // prefer explicit D and O params over meta flags
378                if flag.starts_with('M')
379                    || flag.starts_with('q')
380                    || (flag.starts_with('D') && delta.is_some())
381                    || (flag.starts_with('O') && opaque.is_some())
382                {
383                    continue;
384                } else {
385                    self.conn.write_all(b" ").await?;
386                    self.conn.write_all(flag.as_bytes()).await?;
387                }
388            }
389        }
390
391        Self::check_and_write_quiet_mode(self, is_quiet).await?;
392
393        self.conn.flush().await?;
394
395        match self.drive_receive(parse_meta_arithmetic_response).await? {
396            MetaResponse::Status(Status::Stored) => Ok(None),
397            MetaResponse::Status(Status::NoOp) => Ok(None),
398            MetaResponse::Status(s) => Err(s.into()),
399            MetaResponse::Data(d) => d
400                .map(|mut items| {
401                    let item = items.remove(0);
402                    Ok(item)
403                })
404                .transpose(),
405        }
406    }
407}