1use std::sync::Arc;
2use tokio::{
3 io::{AsyncReadExt, AsyncWriteExt},
4 net::TcpStream,
5 sync::Mutex,
6};
7
8const REQUEST_MAGIC: u8 = 0x80;
9const RESPONSE_MAGIC: u8 = 0x81;
10
11pub struct MemcacheClient {
12 stream: Arc<Mutex<TcpStream>>,
13}
14
15impl MemcacheClient {
16 pub async fn connect(addr: &str) -> Self {
17 let stream = TcpStream::connect(addr)
18 .await
19 .expect("Failed to connect to Memcached");
20 stream.set_nodelay(true).unwrap();
21 Self {
22 stream: Arc::new(Mutex::new(stream)),
23 }
24 }
25
26 #[inline(always)]
27 pub async fn set(
28 &self,
29 key: &str,
30 value: &[u8],
31 flags: u32,
32 exptime: u32,
33 ) -> Result<(), String> {
34 let mut buf = Vec::with_capacity(128);
35 encode_set_into(&mut buf, key, value, flags, exptime);
36 let mut stream = self.stream.lock().await;
37 stream
38 .write_all(&buf)
39 .await
40 .map_err(|e| format!("Write error: {e}"))?;
41 let mut header = [0u8; 24];
42 stream
43 .read_exact(&mut header)
44 .await
45 .map_err(|e| format!("Read error: {e}"))?;
46 if header[0] != RESPONSE_MAGIC {
47 return Err("Invalid response magic".to_string());
48 }
49 let status = u16::from_be_bytes([header[6], header[7]]);
50 let total_body_len =
51 u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
52 if total_body_len > 0 {
53 let mut body = vec![0u8; total_body_len];
54 stream
55 .read_exact(&mut body)
56 .await
57 .map_err(|e| format!("Read error: {e}"))?;
58 }
59 if status == 0 {
60 Ok(())
61 } else {
62 Err(format!("SET error status: {status}"))
63 }
64 }
65
66 #[inline(always)]
68 pub async fn set_quietly(&self, key: &str, value: &[u8]) -> Result<(), String> {
69 let mut buf = Vec::with_capacity(128);
70 encode_set_quietly_into(&mut buf, key, value);
71 let mut stream = self.stream.lock().await;
72 stream
73 .write_all(&buf)
74 .await
75 .map_err(|e| format!("Write error: {e}"))?;
76 Ok(())
77 }
78
79 #[inline(always)]
80 pub async fn set_multi<'a, I>(&self, items: I, flags: u32, exptime: u32) -> Result<(), String>
81 where
82 I: IntoIterator<Item = (&'a str, &'a [u8])>,
83 {
84 let mut stream = self.stream.lock().await;
85 let mut batch_buf = Vec::with_capacity(4096);
86 let mut count = 0;
87 for (key, value) in items.into_iter() {
88 encode_set_into(&mut batch_buf, key, value, flags, exptime);
89 count += 1; }
91 if !batch_buf.is_empty() {
92 stream
93 .write_all(&batch_buf)
94 .await
95 .map_err(|e| format!("Write error: {e}"))?;
96 for _ in 0..count {
98 let mut header = [0u8; 24];
99 stream
100 .read_exact(&mut header)
101 .await
102 .map_err(|e| format!("Read error: {e}"))?;
103 if header[0] != RESPONSE_MAGIC {
104 return Err("Invalid response magic".to_string());
105 }
106 let status = u16::from_be_bytes([header[6], header[7]]);
107 let total_body_len =
108 u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
109 if total_body_len > 0 {
110 let mut body = vec![0u8; total_body_len];
111 stream
112 .read_exact(&mut body)
113 .await
114 .map_err(|e| format!("Read error: {e}"))?;
115 }
116 if status != 0 {
117 return Err(format!("SET error status: {status}"));
118 }
119 }
120 }
121 Ok(())
122 }
123
124 #[inline(always)]
126 pub async fn set_multi_quietly<'a, I>(&self, items: I) -> Result<(), String>
127 where
128 I: IntoIterator<Item = (&'a str, &'a [u8])>,
129 {
130 let mut stream = self.stream.lock().await;
131 let mut batch_buf = Vec::with_capacity(4096);
132 let mut iter = items.into_iter().peekable();
133
134 if iter.peek().is_none() {
135 return Ok(());
136 }
137
138 while let Some((key, value)) = iter.next() {
139 if iter.peek().is_some() {
140 encode_set_quietly_into(&mut batch_buf, key, value);
141 } else {
142 encode_set_into(&mut batch_buf, key, value, 0, 0);
143 }
144 }
145
146 stream
147 .write_all(&batch_buf)
148 .await
149 .map_err(|e| format!("Write error: {e}"))?;
150
151 let mut header = [0u8; 24];
153 stream
154 .read_exact(&mut header)
155 .await
156 .map_err(|e| format!("Read error: {e}"))?;
157 if header[0] != RESPONSE_MAGIC {
158 return Err("Invalid response magic".to_string());
159 }
160 let status = u16::from_be_bytes([header[6], header[7]]);
161 if status != 0 {
162 return Err(format!("SET error status: {status}"));
163 }
164 Ok(())
165 }
166
167 #[inline(always)]
168 pub async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, String> {
169 let req = encode_get(key);
170 let mut stream = self.stream.lock().await;
171 stream
172 .write_all(&req)
173 .await
174 .map_err(|e| format!("Write error: {e}"))?;
175 let mut header = [0u8; 24];
176 stream
177 .read_exact(&mut header)
178 .await
179 .map_err(|e| format!("Read error: {e}"))?;
180 if header[0] != RESPONSE_MAGIC {
181 return Err("Invalid response magic".to_string());
182 }
183 let status = u16::from_be_bytes([header[6], header[7]]);
184 let total_body_len =
185 u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
186 let key_len = u16::from_be_bytes([header[2], header[3]]) as usize;
187 let extras_len = header[4] as usize;
188 let mut body = vec![0u8; total_body_len];
189 if total_body_len > 0 {
190 stream
191 .read_exact(&mut body)
192 .await
193 .map_err(|e| format!("Read error: {e}"))?;
194 }
195 if status == 0 {
196 let value = body[extras_len + key_len..].to_vec();
197 Ok(Some(value))
198 } else if status == 1 {
199 Ok(None)
200 } else {
201 Err(format!("GET error status: {status}"))
202 }
203 }
204
205 #[inline(always)]
206 pub async fn get_multi<'a, I>(&self, keys: I) -> Result<Vec<(String, Option<Vec<u8>>)>, String>
207 where
208 I: IntoIterator<Item = &'a str>,
209 {
210 let mut stream = self.stream.lock().await;
211 let mut batch_buf = Vec::with_capacity(4096);
212 let mut key_vec = Vec::new();
213 for key in keys.into_iter() {
214 encode_get_into(&mut batch_buf, key);
215 key_vec.push(key.to_string());
216 }
217 if !batch_buf.is_empty() {
218 stream
219 .write_all(&batch_buf)
220 .await
221 .map_err(|e| format!("Write error: {e}"))?;
222 }
223 let mut results = Vec::new();
224 for key in key_vec {
225 let mut header = [0u8; 24];
226 stream
227 .read_exact(&mut header)
228 .await
229 .map_err(|e| format!("Read error: {e}"))?;
230 if header[0] != RESPONSE_MAGIC {
231 return Err("Invalid response magic".to_string());
232 }
233 let status = u16::from_be_bytes([header[6], header[7]]);
234 let total_body_len =
235 u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
236 let key_len = u16::from_be_bytes([header[2], header[3]]) as usize;
237 let extras_len = header[4] as usize;
238 let mut body = vec![0u8; total_body_len];
239 if total_body_len > 0 {
240 stream
241 .read_exact(&mut body)
242 .await
243 .map_err(|e| format!("Read error: {e}"))?;
244 }
245 if status == 0 {
246 let value = body[extras_len + key_len..].to_vec();
247 results.push((key, Some(value)));
248 } else if status == 1 {
249 results.push((key, None));
250 } else {
251 return Err(format!("GET error status: {status}"));
252 }
253 }
254 Ok(results)
255 }
256
257 #[inline(always)]
259 pub async fn delete(&self, key: &str) -> Result<(), String> {
260 let mut buf = Vec::with_capacity(64);
261 encode_delete_into(&mut buf, key);
262 let mut stream = self.stream.lock().await;
263 stream
264 .write_all(&buf)
265 .await
266 .map_err(|e| format!("Write error: {e}"))?;
267 let mut header = [0u8; 24];
268 stream
269 .read_exact(&mut header)
270 .await
271 .map_err(|e| format!("Read error: {e}"))?;
272 if header[0] != RESPONSE_MAGIC {
273 return Err("Invalid response magic".to_string());
274 }
275 let status = u16::from_be_bytes([header[6], header[7]]);
276 if status == 0 {
277 Ok(())
278 } else if status == 1 {
279 Ok(())
281 } else {
282 Err(format!("DELETE error status: {status}"))
283 }
284 }
285
286 #[inline(always)]
288 pub async fn delete_quietly(&self, key: &str) -> Result<(), String> {
289 let mut buf = Vec::with_capacity(64);
290 encode_delete_quiet_into(&mut buf, key);
291 let mut stream = self.stream.lock().await;
292 stream
293 .write_all(&buf)
294 .await
295 .map_err(|e| format!("Write error: {e}"))?;
296 Ok(())
297 }
298
299 #[inline(always)]
301 pub async fn delete_multi<'a, I>(&self, keys: I) -> Result<(), String>
302 where
303 I: IntoIterator<Item = &'a str>,
304 {
305 let mut stream = self.stream.lock().await;
306 let mut batch_buf = Vec::with_capacity(4096);
307 let mut count = 0;
308 for key in keys.into_iter() {
309 encode_delete_into(&mut batch_buf, key);
310 count += 1;
311 }
312 if !batch_buf.is_empty() {
313 stream
314 .write_all(&batch_buf)
315 .await
316 .map_err(|e| format!("Write error: {e}"))?;
317 for _ in 0..count {
318 let mut header = [0u8; 24];
319 stream
320 .read_exact(&mut header)
321 .await
322 .map_err(|e| format!("Read error: {e}"))?;
323 if header[0] != RESPONSE_MAGIC {
324 return Err("Invalid response magic".to_string());
325 }
326 let status = u16::from_be_bytes([header[6], header[7]]);
327 if status != 0 && status != 1 {
328 return Err(format!("DELETE error status: {status}"));
329 }
330 }
331 }
332 Ok(())
333 }
334
335 #[inline(always)]
337 pub async fn delete_multi_quietly<'a, I>(&self, keys: I) -> Result<(), String>
338 where
339 I: IntoIterator<Item = &'a str>,
340 {
341 let mut stream = self.stream.lock().await;
342 let mut batch_buf = Vec::with_capacity(4096);
343 let mut iter = keys.into_iter().peekable();
344
345 if iter.peek().is_none() {
346 return Ok(());
347 }
348
349 while let Some(key) = iter.next() {
350 if iter.peek().is_some() {
351 encode_delete_quiet_into(&mut batch_buf, key);
352 } else {
353 encode_delete_into(&mut batch_buf, key);
354 }
355 }
356
357 stream
358 .write_all(&batch_buf)
359 .await
360 .map_err(|e| format!("Write error: {e}"))?;
361
362 let mut header = [0u8; 24];
364 stream
365 .read_exact(&mut header)
366 .await
367 .map_err(|e| format!("Read error: {e}"))?;
368 if header[0] != RESPONSE_MAGIC {
369 return Err("Invalid response magic".to_string());
370 }
371 let status = u16::from_be_bytes([header[6], header[7]]);
372 if status != 0 && status != 1 {
373 return Err(format!("DELETE error status: {status}"));
374 }
375 Ok(())
376 }
377
378 #[inline(always)]
380 pub async fn incr(
381 &self,
382 key: &str,
383 delta: u64,
384 initial: u64,
385 exptime: u32,
386 ) -> Result<Option<u64>, String> {
387 let mut buf = Vec::with_capacity(64);
388 encode_incr_decr_into(&mut buf, key, delta, initial, exptime, true);
389 let mut stream = self.stream.lock().await;
390 stream
391 .write_all(&buf)
392 .await
393 .map_err(|e| format!("Write error: {e}"))?;
394 let mut header = [0u8; 24];
395 stream
396 .read_exact(&mut header)
397 .await
398 .map_err(|e| format!("Read error: {e}"))?;
399 if header[0] != RESPONSE_MAGIC {
400 return Err("Invalid response magic".to_string());
401 }
402 let status = u16::from_be_bytes([header[6], header[7]]);
403 let total_body_len =
404 u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
405 let mut body = vec![0u8; total_body_len];
406 if total_body_len > 0 {
407 stream
408 .read_exact(&mut body)
409 .await
410 .map_err(|e| format!("Read error: {e}"))?;
411 }
412 if status == 0 {
413 let val = u64::from_be_bytes(body[..8].try_into().unwrap());
415 Ok(Some(val))
416 } else if status == 1 {
417 Ok(None)
418 } else {
419 Err(format!("INCR error status: {status}"))
420 }
421 }
422
423 #[inline(always)]
425 pub async fn decr(
426 &self,
427 key: &str,
428 delta: u64,
429 initial: u64,
430 exptime: u32,
431 ) -> Result<Option<u64>, String> {
432 let mut buf = Vec::with_capacity(64);
433 encode_incr_decr_into(&mut buf, key, delta, initial, exptime, false);
434 let mut stream = self.stream.lock().await;
435 stream
436 .write_all(&buf)
437 .await
438 .map_err(|e| format!("Write error: {e}"))?;
439 let mut header = [0u8; 24];
440 stream
441 .read_exact(&mut header)
442 .await
443 .map_err(|e| format!("Read error: {e}"))?;
444 if header[0] != RESPONSE_MAGIC {
445 return Err("Invalid response magic".to_string());
446 }
447 let status = u16::from_be_bytes([header[6], header[7]]);
448 let total_body_len =
449 u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
450 let mut body = vec![0u8; total_body_len];
451 if total_body_len > 0 {
452 stream
453 .read_exact(&mut body)
454 .await
455 .map_err(|e| format!("Read error: {e}"))?;
456 }
457 if status == 0 {
458 let val = u64::from_be_bytes(body[..8].try_into().unwrap());
459 Ok(Some(val))
460 } else if status == 1 {
461 Ok(None)
462 } else {
463 Err(format!("DECR error status: {status}"))
464 }
465 }
466
467 #[inline(always)]
469 pub async fn noop(&self) -> Result<(), String> {
470 let mut buf = Vec::with_capacity(24);
471 encode_noop_into(&mut buf);
472 let mut stream = self.stream.lock().await;
473 stream
474 .write_all(&buf)
475 .await
476 .map_err(|e| format!("Write error: {e}"))?;
477 let mut header = [0u8; 24];
478 stream
479 .read_exact(&mut header)
480 .await
481 .map_err(|e| format!("Read error: {e}"))?;
482 if header[0] != RESPONSE_MAGIC {
483 return Err("Invalid response magic".to_string());
484 }
485 let status = u16::from_be_bytes([header[6], header[7]]);
486 if status == 0 {
487 Ok(())
488 } else {
489 Err(format!("NOOP error status: {status}"))
490 }
491 }
492}
493
494fn encode_get(key: &str) -> Vec<u8> {
497 let mut buf = Vec::with_capacity(24 + key.len());
498 encode_get_into(&mut buf, key);
499 buf
500}
501
502fn encode_get_into(buf: &mut Vec<u8>, key: &str) {
503 buf.reserve(24 + key.len());
504 let key_bytes = key.as_bytes();
505 let total_body_len = key_bytes.len() as u32;
506 buf.push(REQUEST_MAGIC); buf.push(0x00); buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes()); buf.push(0); buf.push(0); buf.extend_from_slice(&[0, 0]); buf.extend_from_slice(&total_body_len.to_be_bytes()); buf.extend_from_slice(&[0, 0, 0, 0]); buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]); buf.extend_from_slice(key_bytes);
516}
517
518fn encode_set_into(buf: &mut Vec<u8>, key: &str, value: &[u8], flags: u32, exptime: u32) {
519 buf.reserve(24 + key.len() + value.len() + 8);
520 let key_bytes = key.as_bytes();
521 let extras_len = 8;
522 let total_body_len = (extras_len + key_bytes.len() + value.len()) as u32;
523 buf.push(REQUEST_MAGIC);
524 buf.push(0x01); buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes());
526 buf.push(extras_len as u8);
527 buf.push(0);
528 buf.extend_from_slice(&[0, 0]);
529 buf.extend_from_slice(&total_body_len.to_be_bytes());
530 buf.extend_from_slice(&[0, 0, 0, 0]);
531 buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]);
532 buf.extend_from_slice(&flags.to_be_bytes());
533 buf.extend_from_slice(&exptime.to_be_bytes());
534 buf.extend_from_slice(key_bytes);
535 buf.extend_from_slice(value);
536}
537
538fn encode_set_quietly_into(buf: &mut Vec<u8>, key: &str, value: &[u8]) {
539 buf.reserve(24 + key.len() + value.len() + 8);
540 let key_bytes = key.as_bytes();
541 let flags: u32 = 0;
542 let exptime: u32 = 0;
543 let extras_len = 8;
544 let total_body_len = (extras_len + key_bytes.len() + value.len()) as u32;
545 buf.push(REQUEST_MAGIC);
546 buf.push(0x11); buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes());
548 buf.push(extras_len as u8);
549 buf.push(0);
550 buf.extend_from_slice(&[0, 0]);
551 buf.extend_from_slice(&total_body_len.to_be_bytes());
552 buf.extend_from_slice(&[0, 0, 0, 0]);
553 buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]);
554 buf.extend_from_slice(&flags.to_be_bytes());
555 buf.extend_from_slice(&exptime.to_be_bytes());
556 buf.extend_from_slice(key_bytes);
557 buf.extend_from_slice(value);
558}
559
560fn encode_delete_into(buf: &mut Vec<u8>, key: &str) {
561 buf.reserve(24 + key.len());
562 let key_bytes = key.as_bytes();
563 let total_body_len = key_bytes.len() as u32;
564 buf.push(REQUEST_MAGIC); buf.push(0x04); buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes()); buf.push(0); buf.push(0); buf.extend_from_slice(&[0, 0]); buf.extend_from_slice(&total_body_len.to_be_bytes()); buf.extend_from_slice(&[0, 0, 0, 0]); buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]); buf.extend_from_slice(key_bytes);
574}
575
576fn encode_delete_quiet_into(buf: &mut Vec<u8>, key: &str) {
577 buf.reserve(24 + key.len());
578 let key_bytes = key.as_bytes();
579 let total_body_len = key_bytes.len() as u32;
580 buf.push(REQUEST_MAGIC); buf.push(0x14); buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes()); buf.push(0); buf.push(0); buf.extend_from_slice(&[0, 0]); buf.extend_from_slice(&total_body_len.to_be_bytes()); buf.extend_from_slice(&[0, 0, 0, 0]); buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]); buf.extend_from_slice(key_bytes);
590}
591
592fn encode_incr_decr_into(
593 buf: &mut Vec<u8>,
594 key: &str,
595 delta: u64,
596 initial: u64,
597 exptime: u32,
598 incr: bool,
599) {
600 buf.reserve(24 + key.len() + 20);
601 let key_bytes = key.as_bytes();
602 let extras_len = 20;
603 let total_body_len = (extras_len + key_bytes.len()) as u32;
604 buf.push(REQUEST_MAGIC);
605 buf.push(if incr { 0x05 } else { 0x06 }); buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes());
607 buf.push(extras_len as u8);
608 buf.push(0);
609 buf.extend_from_slice(&[0, 0]);
610 buf.extend_from_slice(&total_body_len.to_be_bytes());
611 buf.extend_from_slice(&[0, 0, 0, 0]);
612 buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]);
613 buf.extend_from_slice(&delta.to_be_bytes());
614 buf.extend_from_slice(&initial.to_be_bytes());
615 buf.extend_from_slice(&exptime.to_be_bytes());
616 buf.extend_from_slice(key_bytes);
617}
618
619fn encode_noop_into(buf: &mut Vec<u8>) {
620 buf.reserve(24);
621 buf.push(REQUEST_MAGIC);
622 buf.push(0x0a); buf.extend_from_slice(&[0, 0]); buf.push(0); buf.push(0); buf.extend_from_slice(&[0, 0]); buf.extend_from_slice(&0u32.to_be_bytes()); buf.extend_from_slice(&[0, 0, 0, 0]); buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]); }
631
632#[cfg(test)]
633mod tests {
634 use super::*;
635
636 #[tokio::test]
637 async fn test_set_and_get() {
638 let client = MemcacheClient::connect("127.0.0.1:11211").await;
639 client.set("foo", b"bar", 0, 0).await.expect("SET failed");
640 let val = client.get("foo").await.expect("GET failed");
641 assert_eq!(val, Some(b"bar".to_vec()));
642 }
643
644 #[tokio::test]
645 async fn test_get_missing() {
646 let client = MemcacheClient::connect("127.0.0.1:11211").await;
647 let val = client.get("no_such_key").await.expect("GET failed");
648 assert_eq!(val, None);
649 }
650
651 #[tokio::test]
652 async fn test_set_multi_and_get_single_multiple_times() {
653 let client = MemcacheClient::connect("127.0.0.1:11211").await;
654 client
655 .set_multi(
656 vec![("foo", b"bar" as &[u8]), ("baz", b"qux"), ("num", b"42")],
657 0,
658 0,
659 )
660 .await
661 .expect("set_multi failed");
662
663 assert_eq!(client.get("foo").await.unwrap(), Some(b"bar".to_vec()));
664 assert_eq!(client.get("baz").await.unwrap(), Some(b"qux".to_vec()));
665 assert_eq!(client.get("num").await.unwrap(), Some(b"42".to_vec()));
666 }
667
668 #[tokio::test]
669 async fn test_get_multi() {
670 let client = MemcacheClient::connect("127.0.0.1:11211").await;
671 client.set("foo", b"bar", 0, 0).await.unwrap();
672 client.set("baz", b"qux", 0, 0).await.unwrap();
673
674 let results = client
675 .get_multi(vec!["foo", "baz", "missing"])
676 .await
677 .expect("get_multi failed");
678
679 assert_eq!(results[0], ("foo".to_string(), Some(b"bar".to_vec())));
680 assert_eq!(results[1], ("baz".to_string(), Some(b"qux".to_vec())));
681 assert_eq!(results[2], ("missing".to_string(), None));
682 }
683
684 #[tokio::test]
685 async fn test_set_multi_get_multi() {
686 let client = MemcacheClient::connect("127.0.0.1:11211").await;
687 client
688 .set_multi(
689 vec![("foo", b"bar" as &[u8]), ("baz", b"qux"), ("num", b"42")],
690 0,
691 0,
692 )
693 .await
694 .expect("set_multi failed");
695
696 let results = client
697 .get_multi(vec!["foo", "baz", "num"])
698 .await
699 .expect("get_multi failed");
700
701 assert_eq!(results[0], ("foo".to_string(), Some(b"bar".to_vec())));
702 assert_eq!(results[1], ("baz".to_string(), Some(b"qux".to_vec())));
703 assert_eq!(results[2], ("num".to_string(), Some(b"42".to_vec())));
704 }
705
706 #[tokio::test]
707 async fn bench_set_1000() {
708 let client = MemcacheClient::connect("127.0.0.1:11211").await;
709 let start = std::time::Instant::now();
710 for i in 0..1000 {
711 let key = format!("bench_set_1000_{i}");
712 client.set(&key, b"val", 0, 0).await.unwrap();
713 }
714 let elapsed = start.elapsed();
715 println!("bench_set_1000: {:?}", elapsed);
716 println!("per unit: {:?}", elapsed / 1000);
717 }
718
719 #[tokio::test]
720 async fn bench_get_1000() {
721 let client = MemcacheClient::connect("127.0.0.1:11211").await;
722 for i in 0..1000 {
724 let key = format!("bench_get_1000_{i}");
725 client.set(&key, b"val", 0, 0).await.unwrap();
726 }
727 let start = std::time::Instant::now();
728 for i in 0..1000 {
729 let key = format!("bench_get_1000_{i}");
730 let val = client.get(&key).await.unwrap();
731 assert_eq!(val, Some(b"val".to_vec()));
732 }
733 let elapsed = start.elapsed();
734 println!("bench_get_1000: {:?}", elapsed);
735 println!("per unit: {:?}", elapsed / 1000);
736 }
737
738 #[tokio::test]
739 async fn bench_set_multi_1000() {
740 let client = MemcacheClient::connect("127.0.0.1:11211").await;
741 let items: Vec<_> = (0..100)
742 .map(|i| (format!("bench_set_multi_1000_{i}"), b"val" as &[u8]))
743 .collect();
744 let items_ref: Vec<(&str, &[u8])> = items.iter().map(|(k, v)| (k.as_str(), *v)).collect();
745 let start = std::time::Instant::now();
746 client.set_multi(items_ref, 0, 0).await.unwrap();
747 let elapsed = start.elapsed();
748 println!("bench_set_multi_1000: {:?}", elapsed);
749 println!("per unit: {:?}", elapsed / 1000);
750 }
751
752 #[tokio::test]
753 async fn bench_get_multi_1000() {
754 let client = MemcacheClient::connect("127.0.0.1:11211").await;
755 let items: Vec<_> = (0..1000)
757 .map(|i| (format!("bench_get_multi_1000_{i}"), b"val" as &[u8]))
758 .collect();
759 let items_ref: Vec<(&str, &[u8])> = items.iter().map(|(k, v)| (k.as_str(), *v)).collect();
760 client.set_multi(items_ref, 0, 0).await.unwrap();
761
762 let keys: Vec<_> = (0..100)
763 .map(|i| format!("bench_get_multi_1000_{i}"))
764 .collect();
765 let keys_ref: Vec<&str> = keys.iter().map(|k| k.as_str()).collect();
766
767 let start = std::time::Instant::now();
768 let results = client.get_multi(keys_ref).await.unwrap();
769 for (_k, v) in results {
770 assert_eq!(v, Some(b"val".to_vec()));
771 }
772 let elapsed = start.elapsed();
773 println!("bench_get_multi_1000: {:?}", elapsed);
774 println!("per unit: {:?}", elapsed / 1000);
775 }
776
777 #[tokio::test]
778 async fn test_set_quietly_and_get() {
779 let client = MemcacheClient::connect("127.0.0.1:11211").await;
780 client.set_quietly("quiet_key", b"quiet_val").await.unwrap();
781 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
782 let val = client.get("quiet_key").await.unwrap();
783 assert_eq!(val, Some(b"quiet_val".to_vec()));
784 }
785
786 #[tokio::test]
787 async fn bench_set_quietly_1000() {
788 let client = MemcacheClient::connect("127.0.0.1:11211").await;
789 let start = std::time::Instant::now();
790 for i in 0..1000 {
791 let key = format!("bench_set_quietly_1000_{i}");
792 client.set_quietly(&key, b"val").await.unwrap();
793 }
794 let elapsed = start.elapsed();
795 println!("bench_set_quietly_1000: {:?}", elapsed);
796 println!("per unit: {:?}", elapsed / 1000);
797 }
798
799 #[tokio::test]
800 async fn test_set_multi_quiet_and_get() {
801 let client = MemcacheClient::connect("127.0.0.1:11211").await;
802 client
803 .set_multi_quietly(vec![
804 ("quiet_batch_foo", b"bar" as &[u8]),
805 ("quiet_batch_baz", b"qux"),
806 ("quiet_batch_num", b"42"),
807 ])
808 .await
809 .expect("set_multi_quiet failed");
810 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
811 assert_eq!(
812 client.get("quiet_batch_foo").await.unwrap(),
813 Some(b"bar".to_vec())
814 );
815 assert_eq!(
816 client.get("quiet_batch_baz").await.unwrap(),
817 Some(b"qux".to_vec())
818 );
819 assert_eq!(
820 client.get("quiet_batch_num").await.unwrap(),
821 Some(b"42".to_vec())
822 );
823 }
824
825 #[tokio::test]
826 async fn test_set_multi_quiet_empty() {
827 let client = MemcacheClient::connect("127.0.0.1:11211").await;
828 client
829 .set_multi_quietly(Vec::<(&str, &[u8])>::new())
830 .await
831 .unwrap();
832 }
833
834 #[tokio::test]
835 async fn test_set_multi_quiet_single() {
836 let client = MemcacheClient::connect("127.0.0.1:11211").await;
837 client
838 .set_multi_quietly(vec![("quiet_batch_single", b"only" as &[u8])])
839 .await
840 .expect("set_multi_quiet failed");
841 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
842 assert_eq!(
843 client.get("quiet_batch_single").await.unwrap(),
844 Some(b"only".to_vec())
845 );
846 }
847
848 #[tokio::test]
849 async fn bench_set_multi_quiet_1000() {
850 let client = MemcacheClient::connect("127.0.0.1:11211").await;
851 let items: Vec<_> = (0..1000)
852 .map(|i| (format!("bench_set_multi_quiet_1000_{i}"), b"val" as &[u8]))
853 .collect();
854 let items_ref: Vec<(&str, &[u8])> = items.iter().map(|(k, v)| (k.as_str(), *v)).collect();
855 let start = std::time::Instant::now();
856 client.set_multi_quietly(items_ref).await.unwrap();
857 let elapsed = start.elapsed();
858 println!("bench_set_multi_quiet_1000: {:?}", elapsed);
859 println!("per unit: {:?}", elapsed / 1000);
860 }
861
862 #[tokio::test]
863 async fn bench_set_multi_quiet_three_pods_1000_times() {
864 let client = MemcacheClient::connect("127.0.0.1:11211").await;
865
866 let pods = [
868 (0u8, 12345678f64, "bench_ts_key_0", "bench_ep_key_0"),
869 (1u8, 22345678f64, "bench_ts_key_1", "bench_ep_key_1"),
870 (2u8, 32345678f64, "bench_ts_key_2", "bench_ep_key_2"),
871 ];
872
873 let start = std::time::Instant::now();
874
875 for _i in 0..1000 {
876 let mut vec_to_persist: Vec<(&str, Vec<u8>)> = Vec::with_capacity(6);
877 for (trade_side, entry_price, trade_side_key, entry_price_key) in &pods {
878 vec_to_persist.push((*trade_side_key, trade_side.to_le_bytes().to_vec()));
879 vec_to_persist.push((*entry_price_key, entry_price.to_le_bytes().to_vec()));
880 }
881 let vec_to_persist_refs: Vec<(&str, &[u8])> = vec_to_persist
882 .iter()
883 .map(|(k, v)| (*k, v.as_slice()))
884 .collect();
885
886 client.set_multi_quietly(vec_to_persist_refs).await.unwrap();
887 }
888
889 let elapsed = start.elapsed();
890 println!(
891 "bench_set_multi_quiet_three_pods_1000_times: {:?} ({} sets of 3 pods, {} keys total)",
892 elapsed,
893 1000,
894 1000 * 6
895 );
896 println!("per unit: {:?}", elapsed / 1000);
897 }
898
899 #[tokio::test]
900 async fn test_delete_and_get() {
901 let client = MemcacheClient::connect("127.0.0.1:11211").await;
902 client.set("delkey", b"delval", 0, 0).await.unwrap();
903 assert_eq!(
904 client.get("delkey").await.unwrap(),
905 Some(b"delval".to_vec())
906 );
907 client.delete("delkey").await.unwrap();
908 assert_eq!(client.get("delkey").await.unwrap(), None);
909 }
910
911 #[tokio::test]
912 async fn test_delete_quietly_and_get() {
913 let client = MemcacheClient::connect("127.0.0.1:11211").await;
914 client.set("delqkey", b"delqval", 0, 0).await.unwrap();
915 assert_eq!(
916 client.get("delqkey").await.unwrap(),
917 Some(b"delqval".to_vec())
918 );
919 client.delete_quietly("delqkey").await.unwrap();
920 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
922 assert_eq!(client.get("delqkey").await.unwrap(), None);
923 }
924
925 #[tokio::test]
926 async fn test_delete_multi_and_get() {
927 let client = MemcacheClient::connect("127.0.0.1:11211").await;
928 let keys = vec!["delmkey1", "delmkey2", "delmkey3"];
929 for k in &keys {
930 client.set(k, b"val", 0, 0).await.unwrap();
931 }
932 client.delete_multi(keys.clone()).await.unwrap();
933 for k in &keys {
934 assert_eq!(client.get(k).await.unwrap(), None);
935 }
936 }
937
938 #[tokio::test]
939 async fn test_delete_multi_quiet_and_get() {
940 let client = MemcacheClient::connect("127.0.0.1:11211").await;
941 let keys = vec!["delmqkey1", "delmqkey2", "delmqkey3"];
942 for k in &keys {
943 client.set(k, b"val", 0, 0).await.unwrap();
944 }
945 client.delete_multi_quietly(keys.clone()).await.unwrap();
946 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
947 for k in &keys {
948 assert_eq!(client.get(k).await.unwrap(), None);
949 }
950 }
951
952 #[tokio::test]
953 async fn bench_delete_1000() {
954 let client = MemcacheClient::connect("127.0.0.1:11211").await;
955 for i in 0..1000 {
957 let key = format!("bench_delete_1000_{i}");
958 client.set(&key, b"val", 0, 0).await.unwrap();
959 }
960 let start = std::time::Instant::now();
961 for i in 0..1000 {
962 let key = format!("bench_delete_1000_{i}");
963 client.delete(&key).await.unwrap();
964 }
965 let elapsed = start.elapsed();
966 println!("bench_delete_1000: {:?}", elapsed);
967 println!("per unit: {:?}", elapsed / 1000);
968 }
969
970 #[tokio::test]
971 async fn bench_delete_quietly_1000() {
972 let client = MemcacheClient::connect("127.0.0.1:11211").await;
973 for i in 0..1000 {
975 let key = format!("bench_delete_quietly_1000_{i}");
976 client.set(&key, b"val", 0, 0).await.unwrap();
977 }
978 let start = std::time::Instant::now();
979 for i in 0..1000 {
980 let key = format!("bench_delete_quietly_1000_{i}");
981 client.delete_quietly(&key).await.unwrap();
982 }
983 let elapsed = start.elapsed();
984 println!("bench_delete_quietly_1000: {:?}", elapsed);
985 println!("per unit: {:?}", elapsed / 1000);
986 }
987
988 #[tokio::test]
989 async fn bench_delete_multi_1000() {
990 let client = MemcacheClient::connect("127.0.0.1:11211").await;
991 let keys: Vec<_> = (0..1000)
993 .map(|i| format!("bench_delete_multi_1000_{i}"))
994 .collect();
995 for k in &keys {
996 client.set(k, b"val", 0, 0).await.unwrap();
997 }
998 let keys_ref: Vec<&str> = keys.iter().map(|k| k.as_str()).collect();
999 let start = std::time::Instant::now();
1000 client.delete_multi(keys_ref).await.unwrap();
1001 let elapsed = start.elapsed();
1002 println!("bench_delete_multi_1000: {:?}", elapsed);
1003 println!("per unit: {:?}", elapsed / 1000);
1004 }
1005
1006 #[tokio::test]
1007 async fn bench_delete_multi_quiet_1000() {
1008 let client = MemcacheClient::connect("127.0.0.1:11211").await;
1009 let keys: Vec<_> = (0..1000)
1011 .map(|i| format!("bench_delete_multi_quiet_1000_{i}"))
1012 .collect();
1013 for k in &keys {
1014 client.set(k, b"val", 0, 0).await.unwrap();
1015 }
1016 let keys_ref: Vec<&str> = keys.iter().map(|k| k.as_str()).collect();
1017 let start = std::time::Instant::now();
1018 client.delete_multi_quietly(keys_ref).await.unwrap();
1019 let elapsed = start.elapsed();
1020 println!("bench_delete_multi_quiet_1000: {:?}", elapsed);
1021 println!("per unit: {:?}", elapsed / 1000);
1022 }
1023
1024 #[tokio::test]
1025 async fn test_incr_decr() {
1026 let client = MemcacheClient::connect("127.0.0.1:11211").await;
1027
1028 client.set("incrkey", b"42", 0, 0).await.unwrap();
1029 let val = client.incr("incrkey", 8, 0, 0).await.unwrap();
1030 assert_eq!(val, Some(50));
1031 let val = client.decr("incrkey", 10, 0, 0).await.unwrap();
1032 assert_eq!(val, Some(40));
1033 let val = client.incr("missingkey", 1, 123, 0).await.unwrap();
1035 assert_eq!(val, Some(123));
1036 client.delete("incrkey").await.unwrap();
1037 client.delete("missingkey").await.unwrap();
1038 }
1039
1040 #[tokio::test]
1041 async fn test_noop() {
1042 let client = MemcacheClient::connect("127.0.0.1:11211").await;
1043 client.noop().await.unwrap();
1044 }
1045
1046 #[tokio::test]
1047 async fn bench_incr_1000() {
1048 let client = MemcacheClient::connect("127.0.0.1:11211").await;
1049 client.set("bench_incr", b"0", 0, 0).await.unwrap();
1050 let start = std::time::Instant::now();
1051 let mut last = 0;
1052 for _ in 0..1000 {
1053 last = client.incr("bench_incr", 1, 0, 0).await.unwrap().unwrap();
1054 }
1055 let elapsed = start.elapsed();
1056 println!("bench_incr_1000: {:?}, last value: {}", elapsed, last);
1057 }
1058
1059 #[tokio::test]
1060 async fn bench_decr_1000() {
1061 let client = MemcacheClient::connect("127.0.0.1:11211").await;
1062 client.set("bench_decr", b"1000", 0, 0).await.unwrap();
1063 let start = std::time::Instant::now();
1064 let mut last = 0;
1065 for _ in 0..1000 {
1066 last = client.decr("bench_decr", 1, 0, 0).await.unwrap().unwrap();
1067 }
1068 let elapsed = start.elapsed();
1069 println!("bench_decr_1000: {:?}, last value: {}", elapsed, last);
1070 }
1071
1072 #[tokio::test]
1073 async fn bench_noop_1000() {
1074 let client = MemcacheClient::connect("127.0.0.1:11211").await;
1075 let start = std::time::Instant::now();
1076 for _ in 0..1000 {
1077 client.noop().await.unwrap();
1078 }
1079 let elapsed = start.elapsed();
1080 println!("bench_noop_1000: {:?}", elapsed);
1081 }
1082}