1#![deny(warnings, missing_docs)]
3
4use bytes::BytesMut;
5use fxhash::FxHashMap;
6use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
7
8mod connection;
9use self::connection::Connection;
10
11mod error;
12pub use self::error::Error;
13
14mod parser;
15use self::parser::{
16 parse_ascii_metadump_response, parse_ascii_response, parse_ascii_stats_response,
17};
18pub use self::parser::{
19 ErrorKind, KeyMetadata, MetadumpResponse, Response, StatsResponse, Status, Value,
20};
21
22pub mod proto;
24pub use self::proto::{AsciiProtocol, MetaProtocol};
25
26mod value_serializer;
27pub use self::value_serializer::AsMemcachedValue;
28
29const MAX_KEY_LENGTH: usize = 250; pub struct Client {
36 buf: BytesMut,
37 last_read_n: Option<usize>,
38 conn: Connection,
39}
40
41impl Client {
42 pub async fn new<S: AsRef<str>>(dsn: S) -> Result<Client, Error> {
48 let connection = Connection::new(dsn).await?;
49
50 Ok(Client {
51 buf: BytesMut::new(),
52 last_read_n: None,
53 conn: connection,
54 })
55 }
56
57 pub(crate) async fn drive_receive<R, F>(&mut self, op: F) -> Result<R, Error>
58 where
59 F: Fn(&[u8]) -> Result<Option<(usize, R)>, ErrorKind>,
60 {
61 if let Some(n) = self.last_read_n {
63 if n > self.buf.len() {
68 return Err(Status::Error(ErrorKind::Client(
69 "Buffer length is less than last read length".to_string(),
70 ))
71 .into());
72 }
73 let _ = self.buf.split_to(n);
74 }
75
76 let mut needs_more_data = false;
77 loop {
78 if self.buf.is_empty() || needs_more_data {
79 match self.conn {
80 Connection::Tcp(ref mut s) => {
81 self.buf.reserve(1024);
82 let n = s.read_buf(&mut self.buf).await?;
83 if n == 0 {
84 return Err(Error::Io(std::io::ErrorKind::UnexpectedEof.into()));
85 }
86 }
87 Connection::Unix(ref mut s) => {
88 self.buf.reserve(1024);
89 let n = s.read_buf(&mut self.buf).await?;
90 if n == 0 {
91 return Err(Error::Io(std::io::ErrorKind::UnexpectedEof.into()));
92 }
93 }
94 }
95 }
96
97 match op(&self.buf) {
99 Ok(Some((n, response))) => {
101 self.last_read_n = Some(n);
102 return Ok(response);
103 }
104 Ok(None) => {
106 needs_more_data = true;
107 continue;
108 }
109 Err(kind) => return Err(Status::Error(kind).into()),
111 }
112 }
113 }
114
115 pub(crate) async fn get_read_write_response(&mut self) -> Result<Response, Error> {
116 self.drive_receive(parse_ascii_response).await
117 }
118
119 pub(crate) async fn map_set_multi_responses<'a, K, V>(
120 &mut self,
121 kv: &'a [(K, V)],
122 ) -> Result<FxHashMap<&'a K, Result<(), Error>>, Error>
123 where
124 K: AsRef<[u8]> + Eq + std::hash::Hash,
125 V: AsMemcachedValue,
126 {
127 let mut results = FxHashMap::with_capacity_and_hasher(kv.len(), Default::default());
128
129 for (key, _) in kv {
130 let kr = key.as_ref();
131 if kr.len() > MAX_KEY_LENGTH {
132 results.insert(
133 key,
134 Err(Error::Protocol(Status::Error(ErrorKind::Client(
135 "Key exceeds maximum length of 250 bytes".to_string(),
136 )))),
137 );
138 continue;
139 }
140
141 let result = match self.drive_receive(parse_ascii_response).await {
142 Ok(Response::Status(Status::Stored)) => Ok(()),
143 Ok(Response::Status(s)) => Err(s.into()),
144 Ok(_) => Err(Status::Error(ErrorKind::Protocol(None)).into()),
145 Err(e) => return Err(e),
146 };
147
148 results.insert(key, result);
149 }
150
151 Ok(results)
152 }
153
154 pub(crate) async fn get_metadump_response(&mut self) -> Result<MetadumpResponse, Error> {
155 self.drive_receive(parse_ascii_metadump_response).await
156 }
157
158 pub(crate) async fn get_stats_response(&mut self) -> Result<StatsResponse, Error> {
159 self.drive_receive(parse_ascii_stats_response).await
160 }
161
162 pub async fn version(&mut self) -> Result<String, Error> {
170 self.conn.write_all(b"version\r\n").await?;
171 self.conn.flush().await?;
172
173 let mut version = String::new();
174 let bytes = self.conn.read_line(&mut version).await?;
175
176 if bytes >= 8 && version.is_char_boundary(8) {
178 Ok(version.split_off(8))
179 } else {
180 Err(Error::from(Status::Error(ErrorKind::Protocol(Some(
181 format!("Invalid response for `version` command: `{version}`"),
182 )))))
183 }
184 }
185
186 pub async fn dump_keys(&mut self) -> Result<MetadumpIter<'_>, Error> {
198 self.conn.write_all(b"lru_crawler metadump all\r\n").await?;
199 self.conn.flush().await?;
200
201 Ok(MetadumpIter {
202 client: self,
203 done: false,
204 })
205 }
206
207 pub async fn stats(&mut self) -> Result<FxHashMap<String, String>, Error> {
213 let mut entries = FxHashMap::default();
214
215 self.conn.write_all(b"stats\r\n").await?;
216 self.conn.flush().await?;
217
218 while let StatsResponse::Entry(key, value) = self.get_stats_response().await? {
219 entries.insert(key, value);
220 }
221
222 Ok(entries)
223 }
224
225 pub async fn flush_all(&mut self) -> Result<(), Error> {
231 self.conn.write_all(b"flush_all\r\n").await?;
232 self.conn.flush().await?;
233
234 let mut response = String::new();
235 self.conn.read_line(&mut response).await?;
236 if response.trim() == "OK" {
238 Ok(())
239 } else {
240 Err(Error::from(Status::Error(ErrorKind::Protocol(Some(
241 format!("Invalid response for `flush_all` command: `{response}`"),
242 )))))
243 }
244 }
245
246 fn validate_key_length(kr: &[u8]) -> Result<&[u8], Error> {
247 if kr.len() > MAX_KEY_LENGTH {
248 return Err(Error::from(Status::Error(ErrorKind::KeyTooLong)));
249 }
250 Ok(kr)
251 }
252
253 fn validate_opaque_length(opaque: &[u8]) -> Result<&[u8], Error> {
254 if opaque.len() > 32 {
255 return Err(Error::from(Status::Error(ErrorKind::OpaqueTooLong)));
256 }
257 Ok(opaque)
258 }
259
260 async fn check_and_write_opaque(&mut self, opaque: Option<&[u8]>) -> Result<(), Error> {
261 if let Some(opaque) = &opaque {
262 self.conn.write_all(b" O").await?;
263 self.conn.write_all(opaque.as_ref()).await?;
264 }
265 Ok(())
266 }
267
268 async fn check_and_write_meta_flags(
269 &mut self,
270 meta_flags: Option<&[&str]>,
271 opaque: Option<&[u8]>,
272 ) -> Result<(), Error> {
273 if let Some(meta_flags) = meta_flags {
274 for flag in meta_flags {
275 if flag.starts_with('q') || (flag.starts_with('O') && opaque.is_some()) {
277 continue;
278 } else {
279 self.conn.write_all(b" ").await?;
280 self.conn.write_all(flag.as_bytes()).await?;
281 }
282 }
283 }
284 Ok(())
285 }
286
287 async fn check_and_write_quiet_mode(&mut self, is_quiet: bool) -> Result<(), Error> {
288 if is_quiet {
289 self.conn.write_all(b" q\r\nmn\r\n").await?;
290 } else {
291 self.conn.write_all(b"\r\n").await?;
292 }
293 Ok(())
294 }
295}
296
297pub struct MetadumpIter<'a> {
299 client: &'a mut Client,
300 done: bool,
301}
302
303impl MetadumpIter<'_> {
304 pub async fn next(&mut self) -> Option<Result<KeyMetadata, Error>> {
313 if self.done {
314 return None;
315 }
316
317 match self.client.get_metadump_response().await {
318 Ok(MetadumpResponse::End) => {
319 self.done = true;
320 None
321 }
322 Ok(MetadumpResponse::BadClass(s)) => {
323 self.done = true;
324 Some(Err(Error::Protocol(MetadumpResponse::BadClass(s).into())))
325 }
326 Ok(MetadumpResponse::Busy(s)) => {
327 Some(Err(Error::Protocol(MetadumpResponse::Busy(s).into())))
328 }
329 Ok(MetadumpResponse::Entry(km)) => Some(Ok(km)),
330 Err(e) => Some(Err(e)),
331 }
332 }
333}