async_memcached/
lib.rs

1//! A Tokio-based memcached client.
2#![deny(warnings, missing_docs)]
3
4use bytes::BytesMut;
5use fxhash::FxHashMap;
6use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
7
8mod connection;
9use self::connection::Connection;
10
11mod error;
12pub use self::error::Error;
13
14mod parser;
15use self::parser::{
16    parse_ascii_metadump_response, parse_ascii_response, parse_ascii_stats_response,
17};
18pub use self::parser::{
19    ErrorKind, KeyMetadata, MetadumpResponse, Response, StatsResponse, Status, Value,
20};
21
22/// Ascii & Meta protocol implementations
23pub mod proto;
24pub use self::proto::{AsciiProtocol, MetaProtocol};
25
26mod value_serializer;
27pub use self::value_serializer::AsMemcachedValue;
28
29const MAX_KEY_LENGTH: usize = 250; // reference in memcached documentation: https://github.com/memcached/memcached/blob/5609673ed29db98a377749fab469fe80777de8fd/doc/protocol.txt#L46
30
31/// High-level memcached client.
32///
33/// [`Client`] is mapped one-to-one with a given connection to a memcached server, and provides a
34/// high-level API for executing commands on that connection.
35pub struct Client {
36    buf: BytesMut,
37    last_read_n: Option<usize>,
38    conn: Connection,
39}
40
41impl Client {
42    /// Creates a new [`Client`] based on the given data source string.
43    ///
44    /// Supports UNIX domain sockets and TCP connections.
45    /// For TCP: the DSN should be in the format of `tcp://<IP>:<port>` or `<IP>:<port>`.
46    /// For UNIX: the DSN should be in the format of `unix://<path>`.
47    pub async fn new<S: AsRef<str>>(dsn: S) -> Result<Client, Error> {
48        let connection = Connection::new(dsn).await?;
49
50        Ok(Client {
51            buf: BytesMut::new(),
52            last_read_n: None,
53            conn: connection,
54        })
55    }
56
57    pub(crate) async fn drive_receive<R, F>(&mut self, op: F) -> Result<R, Error>
58    where
59        F: Fn(&[u8]) -> Result<Option<(usize, R)>, ErrorKind>,
60    {
61        // If we serviced a previous request, advance our buffer forward.
62        if let Some(n) = self.last_read_n {
63            // Not sure how this situation occurs, but it seems to be related to transient network
64            // issues. This guard is here to prevent panics, but it's not clear what the correct
65            // behavior is. For now, we just return an error, which allows the caller to retry or
66            // fall back to the uncached data source as they see fit.
67            if n > self.buf.len() {
68                return Err(Status::Error(ErrorKind::Client(
69                    "Buffer length is less than last read length".to_string(),
70                ))
71                .into());
72            }
73            let _ = self.buf.split_to(n);
74        }
75
76        let mut needs_more_data = false;
77        loop {
78            if self.buf.is_empty() || needs_more_data {
79                match self.conn {
80                    Connection::Tcp(ref mut s) => {
81                        self.buf.reserve(1024);
82                        let n = s.read_buf(&mut self.buf).await?;
83                        if n == 0 {
84                            return Err(Error::Io(std::io::ErrorKind::UnexpectedEof.into()));
85                        }
86                    }
87                    Connection::Unix(ref mut s) => {
88                        self.buf.reserve(1024);
89                        let n = s.read_buf(&mut self.buf).await?;
90                        if n == 0 {
91                            return Err(Error::Io(std::io::ErrorKind::UnexpectedEof.into()));
92                        }
93                    }
94                }
95            }
96
97            // Try and parse out a response.
98            match op(&self.buf) {
99                // We got a response.
100                Ok(Some((n, response))) => {
101                    self.last_read_n = Some(n);
102                    return Ok(response);
103                }
104                // We didn't have enough data, so loop around and try again.
105                Ok(None) => {
106                    needs_more_data = true;
107                    continue;
108                }
109                // Invalid data not matching the protocol.
110                Err(kind) => return Err(Status::Error(kind).into()),
111            }
112        }
113    }
114
115    pub(crate) async fn get_read_write_response(&mut self) -> Result<Response, Error> {
116        self.drive_receive(parse_ascii_response).await
117    }
118
119    pub(crate) async fn map_set_multi_responses<'a, K, V>(
120        &mut self,
121        kv: &'a [(K, V)],
122    ) -> Result<FxHashMap<&'a K, Result<(), Error>>, Error>
123    where
124        K: AsRef<[u8]> + Eq + std::hash::Hash,
125        V: AsMemcachedValue,
126    {
127        let mut results = FxHashMap::with_capacity_and_hasher(kv.len(), Default::default());
128
129        for (key, _) in kv {
130            let kr = key.as_ref();
131            if kr.len() > MAX_KEY_LENGTH {
132                results.insert(
133                    key,
134                    Err(Error::Protocol(Status::Error(ErrorKind::Client(
135                        "Key exceeds maximum length of 250 bytes".to_string(),
136                    )))),
137                );
138                continue;
139            }
140
141            let result = match self.drive_receive(parse_ascii_response).await {
142                Ok(Response::Status(Status::Stored)) => Ok(()),
143                Ok(Response::Status(s)) => Err(s.into()),
144                Ok(_) => Err(Status::Error(ErrorKind::Protocol(None)).into()),
145                Err(e) => return Err(e),
146            };
147
148            results.insert(key, result);
149        }
150
151        Ok(results)
152    }
153
154    pub(crate) async fn get_metadump_response(&mut self) -> Result<MetadumpResponse, Error> {
155        self.drive_receive(parse_ascii_metadump_response).await
156    }
157
158    pub(crate) async fn get_stats_response(&mut self) -> Result<StatsResponse, Error> {
159        self.drive_receive(parse_ascii_stats_response).await
160    }
161
162    /// Gets the version of the server.
163    ///
164    /// If the version is retrieved successfully, `String` is returned containing the version
165    /// component e.g. `1.6.7`, otherwise [`Error`] is returned.
166    ///
167    /// For some setups, such as those using Twemproxy, this will return an error as those
168    /// intermediate proxies do not support the version command.
169    pub async fn version(&mut self) -> Result<String, Error> {
170        self.conn.write_all(b"version\r\n").await?;
171        self.conn.flush().await?;
172
173        let mut version = String::new();
174        let bytes = self.conn.read_line(&mut version).await?;
175
176        // Peel off the leading "VERSION " header.
177        if bytes >= 8 && version.is_char_boundary(8) {
178            Ok(version.split_off(8))
179        } else {
180            Err(Error::from(Status::Error(ErrorKind::Protocol(Some(
181                format!("Invalid response for `version` command: `{version}`"),
182            )))))
183        }
184    }
185
186    /// Dumps all keys from the server.
187    ///
188    /// This operation scans all slab classes from tail to head, in a non-blocking fashion.  Thus,
189    /// not all items will be found as new items could be inserted or deleted while the crawler is
190    /// still running.
191    ///
192    /// [`MetadumpIter`] must be iterated over to discover whether or not the crawler successfully
193    /// started, as this call will only return [`Error`] if the command failed to be written to the
194    /// server at all.
195    ///
196    /// Available as of memcached 1.4.31.
197    pub async fn dump_keys(&mut self) -> Result<MetadumpIter<'_>, Error> {
198        self.conn.write_all(b"lru_crawler metadump all\r\n").await?;
199        self.conn.flush().await?;
200
201        Ok(MetadumpIter {
202            client: self,
203            done: false,
204        })
205    }
206
207    /// Collects statistics from the server.
208    ///
209    /// The statistics that may be returned are detailed in the protocol specification for
210    /// memcached, but all values returned by this method are returned as strings and are not
211    /// further interpreted or validated for conformity.
212    pub async fn stats(&mut self) -> Result<FxHashMap<String, String>, Error> {
213        let mut entries = FxHashMap::default();
214
215        self.conn.write_all(b"stats\r\n").await?;
216        self.conn.flush().await?;
217
218        while let StatsResponse::Entry(key, value) = self.get_stats_response().await? {
219            entries.insert(key, value);
220        }
221
222        Ok(entries)
223    }
224
225    /// Flushes all existing items on the server
226    ///
227    /// This operation invalidates all existing items immediately. Any items with an update time
228    /// older than the time of the flush_all operation will be ignored for retrieval purposes.
229    /// This operation does not free up memory taken up by the existing items.
230    pub async fn flush_all(&mut self) -> Result<(), Error> {
231        self.conn.write_all(b"flush_all\r\n").await?;
232        self.conn.flush().await?;
233
234        let mut response = String::new();
235        self.conn.read_line(&mut response).await?;
236        // check if response is ok
237        if response.trim() == "OK" {
238            Ok(())
239        } else {
240            Err(Error::from(Status::Error(ErrorKind::Protocol(Some(
241                format!("Invalid response for `flush_all` command: `{response}`"),
242            )))))
243        }
244    }
245
246    fn validate_key_length(kr: &[u8]) -> Result<&[u8], Error> {
247        if kr.len() > MAX_KEY_LENGTH {
248            return Err(Error::from(Status::Error(ErrorKind::KeyTooLong)));
249        }
250        Ok(kr)
251    }
252
253    fn validate_opaque_length(opaque: &[u8]) -> Result<&[u8], Error> {
254        if opaque.len() > 32 {
255            return Err(Error::from(Status::Error(ErrorKind::OpaqueTooLong)));
256        }
257        Ok(opaque)
258    }
259
260    async fn check_and_write_opaque(&mut self, opaque: Option<&[u8]>) -> Result<(), Error> {
261        if let Some(opaque) = &opaque {
262            self.conn.write_all(b" O").await?;
263            self.conn.write_all(opaque.as_ref()).await?;
264        }
265        Ok(())
266    }
267
268    async fn check_and_write_meta_flags(
269        &mut self,
270        meta_flags: Option<&[&str]>,
271        opaque: Option<&[u8]>,
272    ) -> Result<(), Error> {
273        if let Some(meta_flags) = meta_flags {
274            for flag in meta_flags {
275                // Ignore q flag and require use of param, prefer explicit opaque param over O meta flag
276                if flag.starts_with('q') || (flag.starts_with('O') && opaque.is_some()) {
277                    continue;
278                } else {
279                    self.conn.write_all(b" ").await?;
280                    self.conn.write_all(flag.as_bytes()).await?;
281                }
282            }
283        }
284        Ok(())
285    }
286
287    async fn check_and_write_quiet_mode(&mut self, is_quiet: bool) -> Result<(), Error> {
288        if is_quiet {
289            self.conn.write_all(b" q\r\nmn\r\n").await?;
290        } else {
291            self.conn.write_all(b"\r\n").await?;
292        }
293        Ok(())
294    }
295}
296
297/// Asynchronous iterator for metadump operations.
298pub struct MetadumpIter<'a> {
299    client: &'a mut Client,
300    done: bool,
301}
302
303impl MetadumpIter<'_> {
304    /// Gets the next result for the current operation.
305    ///
306    /// If there is another key in the dump, `Some(Ok(KeyMetadata))` will be returned.  If there was
307    /// an error while attempting to start the metadump operation, or if there was a general
308    /// network/protocol-level error, `Some(Err(Error))` will be returned.
309    ///
310    /// Otherwise, `None` will be returned and signals the end of the iterator.  Subsequent calls
311    /// will return `None`.
312    pub async fn next(&mut self) -> Option<Result<KeyMetadata, Error>> {
313        if self.done {
314            return None;
315        }
316
317        match self.client.get_metadump_response().await {
318            Ok(MetadumpResponse::End) => {
319                self.done = true;
320                None
321            }
322            Ok(MetadumpResponse::BadClass(s)) => {
323                self.done = true;
324                Some(Err(Error::Protocol(MetadumpResponse::BadClass(s).into())))
325            }
326            Ok(MetadumpResponse::Busy(s)) => {
327                Some(Err(Error::Protocol(MetadumpResponse::Busy(s).into())))
328            }
329            Ok(MetadumpResponse::Entry(km)) => Some(Ok(km)),
330            Err(e) => Some(Err(e)),
331        }
332    }
333}