1use crate::{AsMemcachedValue, ErrorKind};
2use crate::{Client, Error, Response, Status, Value};
3
4use fxhash::FxHashMap;
5use std::future::Future;
6use tokio::io::AsyncWriteExt;
7
8const MAX_KEY_LENGTH: usize = 250; pub trait AsciiProtocol {
12 fn get<K: AsRef<[u8]>>(&mut self, key: K)
18 -> impl Future<Output = Result<Option<Value>, Error>>;
19
20 fn get_multi<I, K>(&mut self, keys: I) -> impl Future<Output = Result<Vec<Value>, Error>>
26 where
27 I: IntoIterator<Item = K>,
28 K: AsRef<[u8]>;
29
30 #[deprecated(
34 since = "0.4.0",
35 note = "This is now an alias for `get_multi`, and will be removed in the future."
36 )]
37 fn get_many<I, K>(&mut self, keys: I) -> impl Future<Output = Result<Vec<Value>, Error>>
38 where
39 I: IntoIterator<Item = K>,
40 K: AsRef<[u8]>;
41
42 fn set<K, V>(
47 &mut self,
48 key: K,
49 value: V,
50 ttl: Option<i64>,
51 flags: Option<u32>,
52 ) -> impl Future<Output = Result<(), Error>>
53 where
54 K: AsRef<[u8]>,
55 V: AsMemcachedValue;
56
57 fn set_multi<'a, K, V>(
62 &mut self,
63 kv: &'a [(K, V)],
64 ttl: Option<i64>,
65 flags: Option<u32>,
66 ) -> impl Future<Output = Result<FxHashMap<&'a K, Result<(), Error>>, Error>>
67 where
68 K: AsRef<[u8]> + Eq + std::hash::Hash + std::fmt::Debug,
69 V: AsMemcachedValue;
70
71 fn add<K, V>(
73 &mut self,
74 key: K,
75 value: V,
76 ttl: Option<i64>,
77 flags: Option<u32>,
78 ) -> impl Future<Output = Result<(), Error>>
79 where
80 K: AsRef<[u8]>,
81 V: AsMemcachedValue;
82
83 fn add_multi<'a, K, V>(
88 &mut self,
89 kv: &'a [(K, V)],
90 ttl: Option<i64>,
91 flags: Option<u32>,
92 ) -> impl Future<Output = Result<FxHashMap<&'a K, Result<(), Error>>, Error>>
93 where
94 K: AsRef<[u8]> + Eq + std::hash::Hash + std::fmt::Debug,
95 V: AsMemcachedValue;
96
97 fn delete_multi_no_reply<K>(&mut self, keys: &[K]) -> impl Future<Output = Result<(), Error>>
99 where
100 K: AsRef<[u8]>;
101
102 fn delete_no_reply<K>(&mut self, key: K) -> impl Future<Output = Result<(), Error>>
104 where
105 K: AsRef<[u8]>;
106
107 fn delete<K>(&mut self, key: K) -> impl Future<Output = Result<(), Error>>
109 where
110 K: AsRef<[u8]>;
111
112 fn increment<K>(&mut self, key: K, amount: u64) -> impl Future<Output = Result<u64, Error>>
117 where
118 K: AsRef<[u8]>;
119
120 fn increment_no_reply<K>(
124 &mut self,
125 key: K,
126 amount: u64,
127 ) -> impl Future<Output = Result<(), Error>>
128 where
129 K: AsRef<[u8]>;
130
131 fn decrement<K>(&mut self, key: K, amount: u64) -> impl Future<Output = Result<u64, Error>>
136 where
137 K: AsRef<[u8]>;
138
139 fn decrement_no_reply<K>(
143 &mut self,
144 key: K,
145 amount: u64,
146 ) -> impl Future<Output = Result<(), Error>>
147 where
148 K: AsRef<[u8]>;
149}
150
151impl AsciiProtocol for Client {
152 async fn get<K: AsRef<[u8]>>(&mut self, key: K) -> Result<Option<Value>, Error> {
153 let kr = Self::validate_key_length(key.as_ref())?;
154
155 self.conn
156 .write_all(&[b"get ", kr, b"\r\n"].concat())
157 .await?;
158 self.conn.flush().await?;
159
160 match self.get_read_write_response().await? {
161 Response::Status(Status::NotFound) => Ok(None),
162 Response::Status(s) => Err(s.into()),
163 Response::Data(d) => d
164 .map(|mut items| {
165 if items.len() != 1 {
166 Err(Status::Error(ErrorKind::Protocol(None)).into())
167 } else {
168 Ok(items.remove(0))
169 }
170 })
171 .transpose(),
172 _ => Err(Error::Protocol(Status::Error(ErrorKind::Protocol(None)))),
173 }
174 }
175
176 async fn get_multi<I, K>(&mut self, keys: I) -> Result<Vec<Value>, Error>
177 where
178 I: IntoIterator<Item = K>,
179 K: AsRef<[u8]>,
180 {
181 self.conn.write_all(b"get").await?;
182 for key in keys {
183 if key.as_ref().len() > MAX_KEY_LENGTH {
184 continue;
185 }
186 self.conn.write_all(b" ").await?;
187 self.conn.write_all(key.as_ref()).await?;
188 }
189 self.conn.write_all(b"\r\n").await?;
190 self.conn.flush().await?;
191
192 match self.get_read_write_response().await? {
193 Response::Status(s) => Err(s.into()),
194 Response::Data(d) => d.ok_or(Status::NotFound.into()),
195 _ => Err(Status::Error(ErrorKind::Protocol(None)).into()),
196 }
197 }
198
199 async fn get_many<I, K>(&mut self, keys: I) -> Result<Vec<Value>, Error>
200 where
201 I: IntoIterator<Item = K>,
202 K: AsRef<[u8]>,
203 {
204 self.get_multi(keys).await
205 }
206
207 async fn set<K, V>(
208 &mut self,
209 key: K,
210 value: V,
211 ttl: Option<i64>,
212 flags: Option<u32>,
213 ) -> Result<(), Error>
214 where
215 K: AsRef<[u8]>,
216 V: AsMemcachedValue,
217 {
218 let kr = Self::validate_key_length(key.as_ref())?;
219 let vr = value.as_bytes();
220
221 self.conn.write_all(b"set ").await?;
222 self.conn.write_all(kr).await?;
223
224 let flags = flags.unwrap_or(0).to_string();
225 self.conn.write_all(b" ").await?;
226 self.conn.write_all(flags.as_ref()).await?;
227
228 let ttl = ttl.unwrap_or(0).to_string();
229 self.conn.write_all(b" ").await?;
230 self.conn.write_all(ttl.as_ref()).await?;
231
232 let vlen = vr.len().to_string();
233 self.conn.write_all(b" ").await?;
234 self.conn.write_all(vlen.as_ref()).await?;
235 self.conn.write_all(b"\r\n").await?;
236
237 self.conn.write_all(vr.as_ref()).await?;
238 self.conn.write_all(b"\r\n").await?;
239
240 self.conn.flush().await?;
241
242 match self.get_read_write_response().await? {
243 Response::Status(Status::Stored) => Ok(()),
244 Response::Status(s) => Err(s.into()),
245 _ => Err(Status::Error(ErrorKind::Protocol(None)).into()),
246 }
247 }
248
249 async fn set_multi<'a, K, V>(
250 &mut self,
251 kv: &'a [(K, V)],
252 ttl: Option<i64>,
253 flags: Option<u32>,
254 ) -> Result<FxHashMap<&'a K, Result<(), Error>>, Error>
255 where
256 K: AsRef<[u8]> + Eq + std::hash::Hash + std::fmt::Debug,
257 V: AsMemcachedValue,
258 {
259 for (key, value) in kv {
260 let kr = key.as_ref();
261 if kr.len() > MAX_KEY_LENGTH {
262 continue;
263 }
264
265 let vr = value.as_bytes();
266
267 self.conn.write_all(b"set ").await?;
268 self.conn.write_all(kr).await?;
269
270 let flags = flags.unwrap_or(0).to_string();
271 self.conn.write_all(b" ").await?;
272 self.conn.write_all(flags.as_ref()).await?;
273
274 let ttl = ttl.unwrap_or(0).to_string();
275 self.conn.write_all(b" ").await?;
276 self.conn.write_all(ttl.as_ref()).await?;
277
278 let vlen = vr.len().to_string();
279 self.conn.write_all(b" ").await?;
280 self.conn.write_all(vlen.as_ref()).await?;
281 self.conn.write_all(b"\r\n").await?;
282
283 self.conn.write_all(vr.as_ref()).await?;
284 self.conn.write_all(b"\r\n").await?;
285 }
286 self.conn.flush().await?;
287
288 let results = self.map_set_multi_responses(kv).await?;
289
290 Ok(results)
291 }
292
293 async fn add<K, V>(
294 &mut self,
295 key: K,
296 value: V,
297 ttl: Option<i64>,
298 flags: Option<u32>,
299 ) -> Result<(), Error>
300 where
301 K: AsRef<[u8]>,
302 V: AsMemcachedValue,
303 {
304 let kr = Self::validate_key_length(key.as_ref())?;
305 let vr = value.as_bytes();
306
307 self.conn.write_all(b"add ").await?;
308 self.conn.write_all(kr).await?;
309
310 let flags = flags.unwrap_or(0).to_string();
311 self.conn.write_all(b" ").await?;
312 self.conn.write_all(flags.as_ref()).await?;
313
314 let ttl = ttl.unwrap_or(0).to_string();
315 self.conn.write_all(b" ").await?;
316 self.conn.write_all(ttl.as_ref()).await?;
317
318 let vlen = vr.len().to_string();
319 self.conn.write_all(b" ").await?;
320 self.conn.write_all(vlen.as_ref()).await?;
321 self.conn.write_all(b"\r\n").await?;
322
323 self.conn.write_all(vr.as_ref()).await?;
324 self.conn.write_all(b"\r\n").await?;
325
326 self.conn.flush().await?;
327
328 match self.get_read_write_response().await? {
329 Response::Status(Status::Stored) => Ok(()),
330 Response::Status(s) => Err(s.into()),
331 _ => Err(Status::Error(ErrorKind::Protocol(None)).into()),
332 }
333 }
334
335 async fn add_multi<'a, K, V>(
336 &mut self,
337 kv: &'a [(K, V)],
338 ttl: Option<i64>,
339 flags: Option<u32>,
340 ) -> Result<FxHashMap<&'a K, Result<(), Error>>, Error>
341 where
342 K: AsRef<[u8]> + Eq + std::hash::Hash + std::fmt::Debug,
343 V: AsMemcachedValue,
344 {
345 for (key, value) in kv {
346 let kr = key.as_ref();
347 if kr.len() > MAX_KEY_LENGTH {
348 continue;
349 }
350
351 let vr = value.as_bytes();
352
353 self.conn.write_all(b"add ").await?;
354 self.conn.write_all(kr).await?;
355
356 let flags = flags.unwrap_or(0).to_string();
357 self.conn.write_all(b" ").await?;
358 self.conn.write_all(flags.as_ref()).await?;
359
360 let ttl = ttl.unwrap_or(0).to_string();
361 self.conn.write_all(b" ").await?;
362 self.conn.write_all(ttl.as_ref()).await?;
363
364 let vlen = vr.len().to_string();
365 self.conn.write_all(b" ").await?;
366 self.conn.write_all(vlen.as_ref()).await?;
367 self.conn.write_all(b"\r\n").await?;
368
369 self.conn.write_all(vr.as_ref()).await?;
370 self.conn.write_all(b"\r\n").await?;
371 }
372 self.conn.flush().await?;
373
374 let results = self.map_set_multi_responses(kv).await?;
375
376 Ok(results)
377 }
378
379 async fn delete_no_reply<K>(&mut self, key: K) -> Result<(), Error>
381 where
382 K: AsRef<[u8]>,
383 {
384 let kr = Self::validate_key_length(key.as_ref())?;
385
386 self.conn
387 .write_all(&[b"delete ", kr, b" noreply\r\n"].concat())
388 .await?;
389 self.conn.flush().await?;
390 Ok(())
391 }
392
393 async fn delete<K>(&mut self, key: K) -> Result<(), Error>
395 where
396 K: AsRef<[u8]>,
397 {
398 let kr = Self::validate_key_length(key.as_ref())?;
399
400 self.conn
401 .write_all(&[b"delete ", kr, b"\r\n"].concat())
402 .await?;
403 self.conn.flush().await?;
404
405 match self.get_read_write_response().await? {
406 Response::Status(Status::Deleted) => Ok(()),
407 Response::Status(s) => Err(s.into()),
408 _ => Err(Status::Error(ErrorKind::Protocol(None)).into()),
409 }
410 }
411
412 async fn delete_multi_no_reply<K>(&mut self, keys: &[K]) -> Result<(), Error>
413 where
414 K: AsRef<[u8]>,
415 {
416 for key in keys {
417 let kr = key.as_ref();
418 if kr.len() > MAX_KEY_LENGTH {
419 continue;
420 }
421
422 self.conn.write_all(b"delete ").await?;
423 self.conn.write_all(kr).await?;
424 self.conn.write_all(b" noreply\r\n").await?;
425 }
426 self.conn.flush().await?;
427
428 Ok(())
429 }
430
431 async fn increment<K>(&mut self, key: K, amount: u64) -> Result<u64, Error>
432 where
433 K: AsRef<[u8]>,
434 {
435 let kr = Self::validate_key_length(key.as_ref())?;
436
437 self.conn
438 .write_all(&[b"incr ", kr, b" ", amount.to_string().as_bytes(), b"\r\n"].concat())
439 .await?;
440 self.conn.flush().await?;
441
442 match self.get_read_write_response().await? {
443 Response::Status(s) => Err(s.into()),
444 Response::IncrDecr(amount) => Ok(amount),
445 _ => Err(Status::Error(ErrorKind::Protocol(None)).into()),
446 }
447 }
448
449 async fn increment_no_reply<K>(&mut self, key: K, amount: u64) -> Result<(), Error>
450 where
451 K: AsRef<[u8]>,
452 {
453 let kr = Self::validate_key_length(key.as_ref())?;
454
455 self.conn
456 .write_all(
457 &[
458 b"incr ",
459 kr,
460 b" ",
461 amount.to_string().as_bytes(),
462 b" noreply\r\n",
463 ]
464 .concat(),
465 )
466 .await?;
467 self.conn.flush().await?;
468
469 Ok(())
470 }
471
472 async fn decrement<K>(&mut self, key: K, amount: u64) -> Result<u64, Error>
473 where
474 K: AsRef<[u8]>,
475 {
476 let kr = Self::validate_key_length(key.as_ref())?;
477
478 self.conn
479 .write_all(&[b"decr ", kr, b" ", amount.to_string().as_bytes(), b"\r\n"].concat())
480 .await?;
481 self.conn.flush().await?;
482
483 match self.get_read_write_response().await? {
484 Response::Status(s) => Err(s.into()),
485 Response::IncrDecr(amount) => Ok(amount),
486 _ => Err(Status::Error(ErrorKind::Protocol(None)).into()),
487 }
488 }
489
490 async fn decrement_no_reply<K>(&mut self, key: K, amount: u64) -> Result<(), Error>
491 where
492 K: AsRef<[u8]>,
493 {
494 let kr = Self::validate_key_length(key.as_ref())?;
495
496 self.conn
497 .write_all(
498 &[
499 b"decr ",
500 kr,
501 b" ",
502 amount.to_string().as_bytes(),
503 b" noreply\r\n",
504 ]
505 .concat(),
506 )
507 .await?;
508 self.conn.flush().await?;
509
510 Ok(())
511 }
512}