tokio_memcached_rawl/
lib.rs

1use std::sync::Arc;
2use tokio::{
3    io::{AsyncReadExt, AsyncWriteExt},
4    net::TcpStream,
5    sync::Mutex,
6};
7
8const REQUEST_MAGIC: u8 = 0x80;
9const RESPONSE_MAGIC: u8 = 0x81;
10
11pub struct MemcacheClient {
12    stream: Arc<Mutex<TcpStream>>,
13}
14
15impl MemcacheClient {
16    pub async fn connect(addr: &str) -> Self {
17        let stream = TcpStream::connect(addr)
18            .await
19            .expect("Failed to connect to Memcached");
20        stream.set_nodelay(true).unwrap();
21        Self {
22            stream: Arc::new(Mutex::new(stream)),
23        }
24    }
25
26    #[inline(always)]
27    pub async fn set(
28        &self,
29        key: &str,
30        value: &[u8],
31        flags: u32,
32        exptime: u32,
33    ) -> Result<(), String> {
34        let mut buf = Vec::with_capacity(128);
35        encode_set_into(&mut buf, key, value, flags, exptime);
36        let mut stream = self.stream.lock().await;
37        stream
38            .write_all(&buf)
39            .await
40            .map_err(|e| format!("Write error: {e}"))?;
41        let mut header = [0u8; 24];
42        stream
43            .read_exact(&mut header)
44            .await
45            .map_err(|e| format!("Read error: {e}"))?;
46        if header[0] != RESPONSE_MAGIC {
47            return Err("Invalid response magic".to_string());
48        }
49        let status = u16::from_be_bytes([header[6], header[7]]);
50        let total_body_len =
51            u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
52        if total_body_len > 0 {
53            let mut body = vec![0u8; total_body_len];
54            stream
55                .read_exact(&mut body)
56                .await
57                .map_err(|e| format!("Read error: {e}"))?;
58        }
59        if status == 0 {
60            Ok(())
61        } else {
62            Err(format!("SET error status: {status}"))
63        }
64    }
65
66    /// Set a key without waiting for a reply (hot path, fire-and-forget).
67    #[inline(always)]
68    pub async fn set_quietly(&self, key: &str, value: &[u8]) -> Result<(), String> {
69        let mut buf = Vec::with_capacity(128);
70        encode_set_quietly_into(&mut buf, key, value);
71        let mut stream = self.stream.lock().await;
72        stream
73            .write_all(&buf)
74            .await
75            .map_err(|e| format!("Write error: {e}"))?;
76        Ok(())
77    }
78
79    #[inline(always)]
80    pub async fn set_multi<'a, I>(&self, items: I, flags: u32, exptime: u32) -> Result<(), String>
81    where
82        I: IntoIterator<Item = (&'a str, &'a [u8])>,
83    {
84        let mut stream = self.stream.lock().await;
85        let mut batch_buf = Vec::with_capacity(4096);
86        let mut count = 0;
87        for (key, value) in items.into_iter() {
88            encode_set_into(&mut batch_buf, key, value, flags, exptime);
89            count += 1; // Increment for each item
90        }
91        if !batch_buf.is_empty() {
92            stream
93                .write_all(&batch_buf)
94                .await
95                .map_err(|e| format!("Write error: {e}"))?;
96            // Each SET expects a response
97            for _ in 0..count {
98                let mut header = [0u8; 24];
99                stream
100                    .read_exact(&mut header)
101                    .await
102                    .map_err(|e| format!("Read error: {e}"))?;
103                if header[0] != RESPONSE_MAGIC {
104                    return Err("Invalid response magic".to_string());
105                }
106                let status = u16::from_be_bytes([header[6], header[7]]);
107                let total_body_len =
108                    u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
109                if total_body_len > 0 {
110                    let mut body = vec![0u8; total_body_len];
111                    stream
112                        .read_exact(&mut body)
113                        .await
114                        .map_err(|e| format!("Read error: {e}"))?;
115                }
116                if status != 0 {
117                    return Err(format!("SET error status: {status}"));
118                }
119            }
120        }
121        Ok(())
122    }
123
124    /// Batch set using SETQ for all but the last key, and SET for the last key (waits for one response).
125    #[inline(always)]
126    pub async fn set_multi_quietly<'a, I>(&self, items: I) -> Result<(), String>
127    where
128        I: IntoIterator<Item = (&'a str, &'a [u8])>,
129    {
130        let mut stream = self.stream.lock().await;
131        let mut batch_buf = Vec::with_capacity(4096);
132        let mut iter = items.into_iter().peekable();
133
134        if iter.peek().is_none() {
135            return Ok(());
136        }
137
138        while let Some((key, value)) = iter.next() {
139            if iter.peek().is_some() {
140                encode_set_quietly_into(&mut batch_buf, key, value);
141            } else {
142                encode_set_into(&mut batch_buf, key, value, 0, 0);
143            }
144        }
145
146        stream
147            .write_all(&batch_buf)
148            .await
149            .map_err(|e| format!("Write error: {e}"))?;
150
151        // Only read one response (for the last SET)
152        let mut header = [0u8; 24];
153        stream
154            .read_exact(&mut header)
155            .await
156            .map_err(|e| format!("Read error: {e}"))?;
157        if header[0] != RESPONSE_MAGIC {
158            return Err("Invalid response magic".to_string());
159        }
160        let status = u16::from_be_bytes([header[6], header[7]]);
161        if status != 0 {
162            return Err(format!("SET error status: {status}"));
163        }
164        Ok(())
165    }
166
167    #[inline(always)]
168    pub async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, String> {
169        let req = encode_get(key);
170        let mut stream = self.stream.lock().await;
171        stream
172            .write_all(&req)
173            .await
174            .map_err(|e| format!("Write error: {e}"))?;
175        let mut header = [0u8; 24];
176        stream
177            .read_exact(&mut header)
178            .await
179            .map_err(|e| format!("Read error: {e}"))?;
180        if header[0] != RESPONSE_MAGIC {
181            return Err("Invalid response magic".to_string());
182        }
183        let status = u16::from_be_bytes([header[6], header[7]]);
184        let total_body_len =
185            u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
186        let key_len = u16::from_be_bytes([header[2], header[3]]) as usize;
187        let extras_len = header[4] as usize;
188        let mut body = vec![0u8; total_body_len];
189        if total_body_len > 0 {
190            stream
191                .read_exact(&mut body)
192                .await
193                .map_err(|e| format!("Read error: {e}"))?;
194        }
195        if status == 0 {
196            let value = body[extras_len + key_len..].to_vec();
197            Ok(Some(value))
198        } else if status == 1 {
199            Ok(None)
200        } else {
201            Err(format!("GET error status: {status}"))
202        }
203    }
204
205    #[inline(always)]
206    pub async fn get_multi<'a, I>(&self, keys: I) -> Result<Vec<(String, Option<Vec<u8>>)>, String>
207    where
208        I: IntoIterator<Item = &'a str>,
209    {
210        let mut stream = self.stream.lock().await;
211        let mut batch_buf = Vec::with_capacity(4096);
212        let mut key_vec = Vec::new();
213        for key in keys.into_iter() {
214            encode_get_into(&mut batch_buf, key);
215            key_vec.push(key.to_string());
216        }
217        if !batch_buf.is_empty() {
218            stream
219                .write_all(&batch_buf)
220                .await
221                .map_err(|e| format!("Write error: {e}"))?;
222        }
223        let mut results = Vec::new();
224        for key in key_vec {
225            let mut header = [0u8; 24];
226            stream
227                .read_exact(&mut header)
228                .await
229                .map_err(|e| format!("Read error: {e}"))?;
230            if header[0] != RESPONSE_MAGIC {
231                return Err("Invalid response magic".to_string());
232            }
233            let status = u16::from_be_bytes([header[6], header[7]]);
234            let total_body_len =
235                u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
236            let key_len = u16::from_be_bytes([header[2], header[3]]) as usize;
237            let extras_len = header[4] as usize;
238            let mut body = vec![0u8; total_body_len];
239            if total_body_len > 0 {
240                stream
241                    .read_exact(&mut body)
242                    .await
243                    .map_err(|e| format!("Read error: {e}"))?;
244            }
245            if status == 0 {
246                let value = body[extras_len + key_len..].to_vec();
247                results.push((key, Some(value)));
248            } else if status == 1 {
249                results.push((key, None));
250            } else {
251                return Err(format!("GET error status: {status}"));
252            }
253        }
254        Ok(results)
255    }
256
257    /// Delete a single key, waits for response.
258    #[inline(always)]
259    pub async fn delete(&self, key: &str) -> Result<(), String> {
260        let mut buf = Vec::with_capacity(64);
261        encode_delete_into(&mut buf, key);
262        let mut stream = self.stream.lock().await;
263        stream
264            .write_all(&buf)
265            .await
266            .map_err(|e| format!("Write error: {e}"))?;
267        let mut header = [0u8; 24];
268        stream
269            .read_exact(&mut header)
270            .await
271            .map_err(|e| format!("Read error: {e}"))?;
272        if header[0] != RESPONSE_MAGIC {
273            return Err("Invalid response magic".to_string());
274        }
275        let status = u16::from_be_bytes([header[6], header[7]]);
276        if status == 0 {
277            Ok(())
278        } else if status == 1 {
279            // Not found is not an error for delete
280            Ok(())
281        } else {
282            Err(format!("DELETE error status: {status}"))
283        }
284    }
285
286    /// Delete a single key, quietly (no response).
287    #[inline(always)]
288    pub async fn delete_quietly(&self, key: &str) -> Result<(), String> {
289        let mut buf = Vec::with_capacity(64);
290        encode_delete_quiet_into(&mut buf, key);
291        let mut stream = self.stream.lock().await;
292        stream
293            .write_all(&buf)
294            .await
295            .map_err(|e| format!("Write error: {e}"))?;
296        Ok(())
297    }
298
299    /// Delete multiple keys, waits for response for each.
300    #[inline(always)]
301    pub async fn delete_multi<'a, I>(&self, keys: I) -> Result<(), String>
302    where
303        I: IntoIterator<Item = &'a str>,
304    {
305        let mut stream = self.stream.lock().await;
306        let mut batch_buf = Vec::with_capacity(4096);
307        let mut count = 0;
308        for key in keys.into_iter() {
309            encode_delete_into(&mut batch_buf, key);
310            count += 1;
311        }
312        if !batch_buf.is_empty() {
313            stream
314                .write_all(&batch_buf)
315                .await
316                .map_err(|e| format!("Write error: {e}"))?;
317            for _ in 0..count {
318                let mut header = [0u8; 24];
319                stream
320                    .read_exact(&mut header)
321                    .await
322                    .map_err(|e| format!("Read error: {e}"))?;
323                if header[0] != RESPONSE_MAGIC {
324                    return Err("Invalid response magic".to_string());
325                }
326                let status = u16::from_be_bytes([header[6], header[7]]);
327                if status != 0 && status != 1 {
328                    return Err(format!("DELETE error status: {status}"));
329                }
330            }
331        }
332        Ok(())
333    }
334
335    /// Delete multiple keys, quietly (all but last as DELQ, last as DEL).
336    #[inline(always)]
337    pub async fn delete_multi_quietly<'a, I>(&self, keys: I) -> Result<(), String>
338    where
339        I: IntoIterator<Item = &'a str>,
340    {
341        let mut stream = self.stream.lock().await;
342        let mut batch_buf = Vec::with_capacity(4096);
343        let mut iter = keys.into_iter().peekable();
344
345        if iter.peek().is_none() {
346            return Ok(());
347        }
348
349        while let Some(key) = iter.next() {
350            if iter.peek().is_some() {
351                encode_delete_quiet_into(&mut batch_buf, key);
352            } else {
353                encode_delete_into(&mut batch_buf, key);
354            }
355        }
356
357        stream
358            .write_all(&batch_buf)
359            .await
360            .map_err(|e| format!("Write error: {e}"))?;
361
362        // Only read one response (for the last DEL)
363        let mut header = [0u8; 24];
364        stream
365            .read_exact(&mut header)
366            .await
367            .map_err(|e| format!("Read error: {e}"))?;
368        if header[0] != RESPONSE_MAGIC {
369            return Err("Invalid response magic".to_string());
370        }
371        let status = u16::from_be_bytes([header[6], header[7]]);
372        if status != 0 && status != 1 {
373            return Err(format!("DELETE error status: {status}"));
374        }
375        Ok(())
376    }
377
378    /// Increment a key by `delta`. Returns the new value or None if not found.
379    #[inline(always)]
380    pub async fn incr(
381        &self,
382        key: &str,
383        delta: u64,
384        initial: u64,
385        exptime: u32,
386    ) -> Result<Option<u64>, String> {
387        let mut buf = Vec::with_capacity(64);
388        encode_incr_decr_into(&mut buf, key, delta, initial, exptime, true);
389        let mut stream = self.stream.lock().await;
390        stream
391            .write_all(&buf)
392            .await
393            .map_err(|e| format!("Write error: {e}"))?;
394        let mut header = [0u8; 24];
395        stream
396            .read_exact(&mut header)
397            .await
398            .map_err(|e| format!("Read error: {e}"))?;
399        if header[0] != RESPONSE_MAGIC {
400            return Err("Invalid response magic".to_string());
401        }
402        let status = u16::from_be_bytes([header[6], header[7]]);
403        let total_body_len =
404            u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
405        let mut body = vec![0u8; total_body_len];
406        if total_body_len > 0 {
407            stream
408                .read_exact(&mut body)
409                .await
410                .map_err(|e| format!("Read error: {e}"))?;
411        }
412        if status == 0 {
413            // Body is 8 bytes: new value
414            let val = u64::from_be_bytes(body[..8].try_into().unwrap());
415            Ok(Some(val))
416        } else if status == 1 {
417            Ok(None)
418        } else {
419            Err(format!("INCR error status: {status}"))
420        }
421    }
422
423    /// Decrement a key by `delta`. Returns the new value or None if not found.
424    #[inline(always)]
425    pub async fn decr(
426        &self,
427        key: &str,
428        delta: u64,
429        initial: u64,
430        exptime: u32,
431    ) -> Result<Option<u64>, String> {
432        let mut buf = Vec::with_capacity(64);
433        encode_incr_decr_into(&mut buf, key, delta, initial, exptime, false);
434        let mut stream = self.stream.lock().await;
435        stream
436            .write_all(&buf)
437            .await
438            .map_err(|e| format!("Write error: {e}"))?;
439        let mut header = [0u8; 24];
440        stream
441            .read_exact(&mut header)
442            .await
443            .map_err(|e| format!("Read error: {e}"))?;
444        if header[0] != RESPONSE_MAGIC {
445            return Err("Invalid response magic".to_string());
446        }
447        let status = u16::from_be_bytes([header[6], header[7]]);
448        let total_body_len =
449            u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
450        let mut body = vec![0u8; total_body_len];
451        if total_body_len > 0 {
452            stream
453                .read_exact(&mut body)
454                .await
455                .map_err(|e| format!("Read error: {e}"))?;
456        }
457        if status == 0 {
458            let val = u64::from_be_bytes(body[..8].try_into().unwrap());
459            Ok(Some(val))
460        } else if status == 1 {
461            Ok(None)
462        } else {
463            Err(format!("DECR error status: {status}"))
464        }
465    }
466
467    /// Send a NOOP command (useful for pipelining).
468    #[inline(always)]
469    pub async fn noop(&self) -> Result<(), String> {
470        let mut buf = Vec::with_capacity(24);
471        encode_noop_into(&mut buf);
472        let mut stream = self.stream.lock().await;
473        stream
474            .write_all(&buf)
475            .await
476            .map_err(|e| format!("Write error: {e}"))?;
477        let mut header = [0u8; 24];
478        stream
479            .read_exact(&mut header)
480            .await
481            .map_err(|e| format!("Read error: {e}"))?;
482        if header[0] != RESPONSE_MAGIC {
483            return Err("Invalid response magic".to_string());
484        }
485        let status = u16::from_be_bytes([header[6], header[7]]);
486        if status == 0 {
487            Ok(())
488        } else {
489            Err(format!("NOOP error status: {status}"))
490        }
491    }
492}
493
494// --- Binary protocol encoding helpers ---
495
496fn encode_get(key: &str) -> Vec<u8> {
497    let mut buf = Vec::with_capacity(24 + key.len());
498    encode_get_into(&mut buf, key);
499    buf
500}
501
502fn encode_get_into(buf: &mut Vec<u8>, key: &str) {
503    buf.reserve(24 + key.len());
504    let key_bytes = key.as_bytes();
505    let total_body_len = key_bytes.len() as u32;
506    buf.push(REQUEST_MAGIC); // magic
507    buf.push(0x00); // opcode: GET
508    buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes()); // key length
509    buf.push(0); // extras length
510    buf.push(0); // data type
511    buf.extend_from_slice(&[0, 0]); // vbucket id
512    buf.extend_from_slice(&total_body_len.to_be_bytes()); // total body length
513    buf.extend_from_slice(&[0, 0, 0, 0]); // opaque
514    buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]); // CAS
515    buf.extend_from_slice(key_bytes);
516}
517
518fn encode_set_into(buf: &mut Vec<u8>, key: &str, value: &[u8], flags: u32, exptime: u32) {
519    buf.reserve(24 + key.len() + value.len() + 8);
520    let key_bytes = key.as_bytes();
521    let extras_len = 8;
522    let total_body_len = (extras_len + key_bytes.len() + value.len()) as u32;
523    buf.push(REQUEST_MAGIC);
524    buf.push(0x01); // SET
525    buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes());
526    buf.push(extras_len as u8);
527    buf.push(0);
528    buf.extend_from_slice(&[0, 0]);
529    buf.extend_from_slice(&total_body_len.to_be_bytes());
530    buf.extend_from_slice(&[0, 0, 0, 0]);
531    buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]);
532    buf.extend_from_slice(&flags.to_be_bytes());
533    buf.extend_from_slice(&exptime.to_be_bytes());
534    buf.extend_from_slice(key_bytes);
535    buf.extend_from_slice(value);
536}
537
538fn encode_set_quietly_into(buf: &mut Vec<u8>, key: &str, value: &[u8]) {
539    buf.reserve(24 + key.len() + value.len() + 8);
540    let key_bytes = key.as_bytes();
541    let flags: u32 = 0;
542    let exptime: u32 = 0;
543    let extras_len = 8;
544    let total_body_len = (extras_len + key_bytes.len() + value.len()) as u32;
545    buf.push(REQUEST_MAGIC);
546    buf.push(0x11); // SETQ
547    buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes());
548    buf.push(extras_len as u8);
549    buf.push(0);
550    buf.extend_from_slice(&[0, 0]);
551    buf.extend_from_slice(&total_body_len.to_be_bytes());
552    buf.extend_from_slice(&[0, 0, 0, 0]);
553    buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]);
554    buf.extend_from_slice(&flags.to_be_bytes());
555    buf.extend_from_slice(&exptime.to_be_bytes());
556    buf.extend_from_slice(key_bytes);
557    buf.extend_from_slice(value);
558}
559
560fn encode_delete_into(buf: &mut Vec<u8>, key: &str) {
561    buf.reserve(24 + key.len());
562    let key_bytes = key.as_bytes();
563    let total_body_len = key_bytes.len() as u32;
564    buf.push(REQUEST_MAGIC); // magic
565    buf.push(0x04); // opcode: DELETE
566    buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes()); // key length
567    buf.push(0); // extras length
568    buf.push(0); // data type
569    buf.extend_from_slice(&[0, 0]); // vbucket id
570    buf.extend_from_slice(&total_body_len.to_be_bytes()); // total body length
571    buf.extend_from_slice(&[0, 0, 0, 0]); // opaque
572    buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]); // CAS
573    buf.extend_from_slice(key_bytes);
574}
575
576fn encode_delete_quiet_into(buf: &mut Vec<u8>, key: &str) {
577    buf.reserve(24 + key.len());
578    let key_bytes = key.as_bytes();
579    let total_body_len = key_bytes.len() as u32;
580    buf.push(REQUEST_MAGIC); // magic
581    buf.push(0x14); // opcode: DELETEQ
582    buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes()); // key length
583    buf.push(0); // extras length
584    buf.push(0); // data type
585    buf.extend_from_slice(&[0, 0]); // vbucket id
586    buf.extend_from_slice(&total_body_len.to_be_bytes()); // total body length
587    buf.extend_from_slice(&[0, 0, 0, 0]); // opaque
588    buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]); // CAS
589    buf.extend_from_slice(key_bytes);
590}
591
592fn encode_incr_decr_into(
593    buf: &mut Vec<u8>,
594    key: &str,
595    delta: u64,
596    initial: u64,
597    exptime: u32,
598    incr: bool,
599) {
600    buf.reserve(24 + key.len() + 20);
601    let key_bytes = key.as_bytes();
602    let extras_len = 20;
603    let total_body_len = (extras_len + key_bytes.len()) as u32;
604    buf.push(REQUEST_MAGIC);
605    buf.push(if incr { 0x05 } else { 0x06 }); // INCR or DECR
606    buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes());
607    buf.push(extras_len as u8);
608    buf.push(0);
609    buf.extend_from_slice(&[0, 0]);
610    buf.extend_from_slice(&total_body_len.to_be_bytes());
611    buf.extend_from_slice(&[0, 0, 0, 0]);
612    buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]);
613    buf.extend_from_slice(&delta.to_be_bytes());
614    buf.extend_from_slice(&initial.to_be_bytes());
615    buf.extend_from_slice(&exptime.to_be_bytes());
616    buf.extend_from_slice(key_bytes);
617}
618
619fn encode_noop_into(buf: &mut Vec<u8>) {
620    buf.reserve(24);
621    buf.push(REQUEST_MAGIC);
622    buf.push(0x0a); // NOOP
623    buf.extend_from_slice(&[0, 0]); // key length
624    buf.push(0); // extras length
625    buf.push(0); // data type
626    buf.extend_from_slice(&[0, 0]); // vbucket id
627    buf.extend_from_slice(&0u32.to_be_bytes()); // total body length
628    buf.extend_from_slice(&[0, 0, 0, 0]); // opaque
629    buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]); // CAS
630}
631
632#[cfg(test)]
633mod tests {
634    use super::*;
635
636    #[tokio::test]
637    async fn test_set_and_get() {
638        let client = MemcacheClient::connect("127.0.0.1:11211").await;
639        client.set("foo", b"bar", 0, 0).await.expect("SET failed");
640        let val = client.get("foo").await.expect("GET failed");
641        assert_eq!(val, Some(b"bar".to_vec()));
642    }
643
644    #[tokio::test]
645    async fn test_get_missing() {
646        let client = MemcacheClient::connect("127.0.0.1:11211").await;
647        let val = client.get("no_such_key").await.expect("GET failed");
648        assert_eq!(val, None);
649    }
650
651    #[tokio::test]
652    async fn test_set_multi_and_get_single_multiple_times() {
653        let client = MemcacheClient::connect("127.0.0.1:11211").await;
654        client
655            .set_multi(
656                vec![("foo", b"bar" as &[u8]), ("baz", b"qux"), ("num", b"42")],
657                0,
658                0,
659            )
660            .await
661            .expect("set_multi failed");
662
663        assert_eq!(client.get("foo").await.unwrap(), Some(b"bar".to_vec()));
664        assert_eq!(client.get("baz").await.unwrap(), Some(b"qux".to_vec()));
665        assert_eq!(client.get("num").await.unwrap(), Some(b"42".to_vec()));
666    }
667
668    #[tokio::test]
669    async fn test_get_multi() {
670        let client = MemcacheClient::connect("127.0.0.1:11211").await;
671        client.set("foo", b"bar", 0, 0).await.unwrap();
672        client.set("baz", b"qux", 0, 0).await.unwrap();
673
674        let results = client
675            .get_multi(vec!["foo", "baz", "missing"])
676            .await
677            .expect("get_multi failed");
678
679        assert_eq!(results[0], ("foo".to_string(), Some(b"bar".to_vec())));
680        assert_eq!(results[1], ("baz".to_string(), Some(b"qux".to_vec())));
681        assert_eq!(results[2], ("missing".to_string(), None));
682    }
683
684    #[tokio::test]
685    async fn test_set_multi_get_multi() {
686        let client = MemcacheClient::connect("127.0.0.1:11211").await;
687        client
688            .set_multi(
689                vec![("foo", b"bar" as &[u8]), ("baz", b"qux"), ("num", b"42")],
690                0,
691                0,
692            )
693            .await
694            .expect("set_multi failed");
695
696        let results = client
697            .get_multi(vec!["foo", "baz", "num"])
698            .await
699            .expect("get_multi failed");
700
701        assert_eq!(results[0], ("foo".to_string(), Some(b"bar".to_vec())));
702        assert_eq!(results[1], ("baz".to_string(), Some(b"qux".to_vec())));
703        assert_eq!(results[2], ("num".to_string(), Some(b"42".to_vec())));
704    }
705
706    #[tokio::test]
707    async fn bench_set_1000() {
708        let client = MemcacheClient::connect("127.0.0.1:11211").await;
709        let start = std::time::Instant::now();
710        for i in 0..1000 {
711            let key = format!("bench_set_1000_{i}");
712            client.set(&key, b"val", 0, 0).await.unwrap();
713        }
714        let elapsed = start.elapsed();
715        println!("bench_set_1000: {:?}", elapsed);
716        println!("per unit: {:?}", elapsed / 1000);
717    }
718
719    #[tokio::test]
720    async fn bench_get_1000() {
721        let client = MemcacheClient::connect("127.0.0.1:11211").await;
722        // Pre-populate
723        for i in 0..1000 {
724            let key = format!("bench_get_1000_{i}");
725            client.set(&key, b"val", 0, 0).await.unwrap();
726        }
727        let start = std::time::Instant::now();
728        for i in 0..1000 {
729            let key = format!("bench_get_1000_{i}");
730            let val = client.get(&key).await.unwrap();
731            assert_eq!(val, Some(b"val".to_vec()));
732        }
733        let elapsed = start.elapsed();
734        println!("bench_get_1000: {:?}", elapsed);
735        println!("per unit: {:?}", elapsed / 1000);
736    }
737
738    #[tokio::test]
739    async fn bench_set_multi_1000() {
740        let client = MemcacheClient::connect("127.0.0.1:11211").await;
741        let items: Vec<_> = (0..100)
742            .map(|i| (format!("bench_set_multi_1000_{i}"), b"val" as &[u8]))
743            .collect();
744        let items_ref: Vec<(&str, &[u8])> = items.iter().map(|(k, v)| (k.as_str(), *v)).collect();
745        let start = std::time::Instant::now();
746        client.set_multi(items_ref, 0, 0).await.unwrap();
747        let elapsed = start.elapsed();
748        println!("bench_set_multi_1000: {:?}", elapsed);
749        println!("per unit: {:?}", elapsed / 1000);
750    }
751
752    #[tokio::test]
753    async fn bench_get_multi_1000() {
754        let client = MemcacheClient::connect("127.0.0.1:11211").await;
755        // Pre-populate
756        let items: Vec<_> = (0..1000)
757            .map(|i| (format!("bench_get_multi_1000_{i}"), b"val" as &[u8]))
758            .collect();
759        let items_ref: Vec<(&str, &[u8])> = items.iter().map(|(k, v)| (k.as_str(), *v)).collect();
760        client.set_multi(items_ref, 0, 0).await.unwrap();
761
762        let keys: Vec<_> = (0..100)
763            .map(|i| format!("bench_get_multi_1000_{i}"))
764            .collect();
765        let keys_ref: Vec<&str> = keys.iter().map(|k| k.as_str()).collect();
766
767        let start = std::time::Instant::now();
768        let results = client.get_multi(keys_ref).await.unwrap();
769        for (_k, v) in results {
770            assert_eq!(v, Some(b"val".to_vec()));
771        }
772        let elapsed = start.elapsed();
773        println!("bench_get_multi_1000: {:?}", elapsed);
774        println!("per unit: {:?}", elapsed / 1000);
775    }
776
777    #[tokio::test]
778    async fn test_set_quietly_and_get() {
779        let client = MemcacheClient::connect("127.0.0.1:11211").await;
780        client.set_quietly("quiet_key", b"quiet_val").await.unwrap();
781        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
782        let val = client.get("quiet_key").await.unwrap();
783        assert_eq!(val, Some(b"quiet_val".to_vec()));
784    }
785
786    #[tokio::test]
787    async fn bench_set_quietly_1000() {
788        let client = MemcacheClient::connect("127.0.0.1:11211").await;
789        let start = std::time::Instant::now();
790        for i in 0..1000 {
791            let key = format!("bench_set_quietly_1000_{i}");
792            client.set_quietly(&key, b"val").await.unwrap();
793        }
794        let elapsed = start.elapsed();
795        println!("bench_set_quietly_1000: {:?}", elapsed);
796        println!("per unit: {:?}", elapsed / 1000);
797    }
798
799    #[tokio::test]
800    async fn test_set_multi_quiet_and_get() {
801        let client = MemcacheClient::connect("127.0.0.1:11211").await;
802        client
803            .set_multi_quietly(vec![
804                ("quiet_batch_foo", b"bar" as &[u8]),
805                ("quiet_batch_baz", b"qux"),
806                ("quiet_batch_num", b"42"),
807            ])
808            .await
809            .expect("set_multi_quiet failed");
810        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
811        assert_eq!(
812            client.get("quiet_batch_foo").await.unwrap(),
813            Some(b"bar".to_vec())
814        );
815        assert_eq!(
816            client.get("quiet_batch_baz").await.unwrap(),
817            Some(b"qux".to_vec())
818        );
819        assert_eq!(
820            client.get("quiet_batch_num").await.unwrap(),
821            Some(b"42".to_vec())
822        );
823    }
824
825    #[tokio::test]
826    async fn test_set_multi_quiet_empty() {
827        let client = MemcacheClient::connect("127.0.0.1:11211").await;
828        client
829            .set_multi_quietly(Vec::<(&str, &[u8])>::new())
830            .await
831            .unwrap();
832    }
833
834    #[tokio::test]
835    async fn test_set_multi_quiet_single() {
836        let client = MemcacheClient::connect("127.0.0.1:11211").await;
837        client
838            .set_multi_quietly(vec![("quiet_batch_single", b"only" as &[u8])])
839            .await
840            .expect("set_multi_quiet failed");
841        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
842        assert_eq!(
843            client.get("quiet_batch_single").await.unwrap(),
844            Some(b"only".to_vec())
845        );
846    }
847
848    #[tokio::test]
849    async fn bench_set_multi_quiet_1000() {
850        let client = MemcacheClient::connect("127.0.0.1:11211").await;
851        let items: Vec<_> = (0..1000)
852            .map(|i| (format!("bench_set_multi_quiet_1000_{i}"), b"val" as &[u8]))
853            .collect();
854        let items_ref: Vec<(&str, &[u8])> = items.iter().map(|(k, v)| (k.as_str(), *v)).collect();
855        let start = std::time::Instant::now();
856        client.set_multi_quietly(items_ref).await.unwrap();
857        let elapsed = start.elapsed();
858        println!("bench_set_multi_quiet_1000: {:?}", elapsed);
859        println!("per unit: {:?}", elapsed / 1000);
860    }
861
862    #[tokio::test]
863    async fn bench_set_multi_quiet_three_pods_1000_times() {
864        let client = MemcacheClient::connect("127.0.0.1:11211").await;
865
866        // Prepare 3 pods with fixed trade_side and entry_price values
867        let pods = [
868            (0u8, 12345678f64, "bench_ts_key_0", "bench_ep_key_0"),
869            (1u8, 22345678f64, "bench_ts_key_1", "bench_ep_key_1"),
870            (2u8, 32345678f64, "bench_ts_key_2", "bench_ep_key_2"),
871        ];
872
873        let start = std::time::Instant::now();
874
875        for _i in 0..1000 {
876            let mut vec_to_persist: Vec<(&str, Vec<u8>)> = Vec::with_capacity(6);
877            for (trade_side, entry_price, trade_side_key, entry_price_key) in &pods {
878                vec_to_persist.push((*trade_side_key, trade_side.to_le_bytes().to_vec()));
879                vec_to_persist.push((*entry_price_key, entry_price.to_le_bytes().to_vec()));
880            }
881            let vec_to_persist_refs: Vec<(&str, &[u8])> = vec_to_persist
882                .iter()
883                .map(|(k, v)| (*k, v.as_slice()))
884                .collect();
885
886            client.set_multi_quietly(vec_to_persist_refs).await.unwrap();
887        }
888
889        let elapsed = start.elapsed();
890        println!(
891            "bench_set_multi_quiet_three_pods_1000_times: {:?} ({} sets of 3 pods, {} keys total)",
892            elapsed,
893            1000,
894            1000 * 6
895        );
896        println!("per unit: {:?}", elapsed / 1000);
897    }
898
899    #[tokio::test]
900    async fn test_delete_and_get() {
901        let client = MemcacheClient::connect("127.0.0.1:11211").await;
902        client.set("delkey", b"delval", 0, 0).await.unwrap();
903        assert_eq!(
904            client.get("delkey").await.unwrap(),
905            Some(b"delval".to_vec())
906        );
907        client.delete("delkey").await.unwrap();
908        assert_eq!(client.get("delkey").await.unwrap(), None);
909    }
910
911    #[tokio::test]
912    async fn test_delete_quietly_and_get() {
913        let client = MemcacheClient::connect("127.0.0.1:11211").await;
914        client.set("delqkey", b"delqval", 0, 0).await.unwrap();
915        assert_eq!(
916            client.get("delqkey").await.unwrap(),
917            Some(b"delqval".to_vec())
918        );
919        client.delete_quietly("delqkey").await.unwrap();
920        // Give memcached a moment to process
921        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
922        assert_eq!(client.get("delqkey").await.unwrap(), None);
923    }
924
925    #[tokio::test]
926    async fn test_delete_multi_and_get() {
927        let client = MemcacheClient::connect("127.0.0.1:11211").await;
928        let keys = vec!["delmkey1", "delmkey2", "delmkey3"];
929        for k in &keys {
930            client.set(k, b"val", 0, 0).await.unwrap();
931        }
932        client.delete_multi(keys.clone()).await.unwrap();
933        for k in &keys {
934            assert_eq!(client.get(k).await.unwrap(), None);
935        }
936    }
937
938    #[tokio::test]
939    async fn test_delete_multi_quiet_and_get() {
940        let client = MemcacheClient::connect("127.0.0.1:11211").await;
941        let keys = vec!["delmqkey1", "delmqkey2", "delmqkey3"];
942        for k in &keys {
943            client.set(k, b"val", 0, 0).await.unwrap();
944        }
945        client.delete_multi_quietly(keys.clone()).await.unwrap();
946        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
947        for k in &keys {
948            assert_eq!(client.get(k).await.unwrap(), None);
949        }
950    }
951
952    #[tokio::test]
953    async fn bench_delete_1000() {
954        let client = MemcacheClient::connect("127.0.0.1:11211").await;
955        // Pre-populate
956        for i in 0..1000 {
957            let key = format!("bench_delete_1000_{i}");
958            client.set(&key, b"val", 0, 0).await.unwrap();
959        }
960        let start = std::time::Instant::now();
961        for i in 0..1000 {
962            let key = format!("bench_delete_1000_{i}");
963            client.delete(&key).await.unwrap();
964        }
965        let elapsed = start.elapsed();
966        println!("bench_delete_1000: {:?}", elapsed);
967        println!("per unit: {:?}", elapsed / 1000);
968    }
969
970    #[tokio::test]
971    async fn bench_delete_quietly_1000() {
972        let client = MemcacheClient::connect("127.0.0.1:11211").await;
973        // Pre-populate
974        for i in 0..1000 {
975            let key = format!("bench_delete_quietly_1000_{i}");
976            client.set(&key, b"val", 0, 0).await.unwrap();
977        }
978        let start = std::time::Instant::now();
979        for i in 0..1000 {
980            let key = format!("bench_delete_quietly_1000_{i}");
981            client.delete_quietly(&key).await.unwrap();
982        }
983        let elapsed = start.elapsed();
984        println!("bench_delete_quietly_1000: {:?}", elapsed);
985        println!("per unit: {:?}", elapsed / 1000);
986    }
987
988    #[tokio::test]
989    async fn bench_delete_multi_1000() {
990        let client = MemcacheClient::connect("127.0.0.1:11211").await;
991        // Pre-populate
992        let keys: Vec<_> = (0..1000)
993            .map(|i| format!("bench_delete_multi_1000_{i}"))
994            .collect();
995        for k in &keys {
996            client.set(k, b"val", 0, 0).await.unwrap();
997        }
998        let keys_ref: Vec<&str> = keys.iter().map(|k| k.as_str()).collect();
999        let start = std::time::Instant::now();
1000        client.delete_multi(keys_ref).await.unwrap();
1001        let elapsed = start.elapsed();
1002        println!("bench_delete_multi_1000: {:?}", elapsed);
1003        println!("per unit: {:?}", elapsed / 1000);
1004    }
1005
1006    #[tokio::test]
1007    async fn bench_delete_multi_quiet_1000() {
1008        let client = MemcacheClient::connect("127.0.0.1:11211").await;
1009        // Pre-populate
1010        let keys: Vec<_> = (0..1000)
1011            .map(|i| format!("bench_delete_multi_quiet_1000_{i}"))
1012            .collect();
1013        for k in &keys {
1014            client.set(k, b"val", 0, 0).await.unwrap();
1015        }
1016        let keys_ref: Vec<&str> = keys.iter().map(|k| k.as_str()).collect();
1017        let start = std::time::Instant::now();
1018        client.delete_multi_quietly(keys_ref).await.unwrap();
1019        let elapsed = start.elapsed();
1020        println!("bench_delete_multi_quiet_1000: {:?}", elapsed);
1021        println!("per unit: {:?}", elapsed / 1000);
1022    }
1023
1024    #[tokio::test]
1025    async fn test_incr_decr() {
1026        let client = MemcacheClient::connect("127.0.0.1:11211").await;
1027
1028        client.set("incrkey", b"42", 0, 0).await.unwrap();
1029        let val = client.incr("incrkey", 8, 0, 0).await.unwrap();
1030        assert_eq!(val, Some(50));
1031        let val = client.decr("incrkey", 10, 0, 0).await.unwrap();
1032        assert_eq!(val, Some(40));
1033        // Not found
1034        let val = client.incr("missingkey", 1, 123, 0).await.unwrap();
1035        assert_eq!(val, Some(123));
1036        client.delete("incrkey").await.unwrap();
1037        client.delete("missingkey").await.unwrap();
1038    }
1039
1040    #[tokio::test]
1041    async fn test_noop() {
1042        let client = MemcacheClient::connect("127.0.0.1:11211").await;
1043        client.noop().await.unwrap();
1044    }
1045
1046    #[tokio::test]
1047    async fn bench_incr_1000() {
1048        let client = MemcacheClient::connect("127.0.0.1:11211").await;
1049        client.set("bench_incr", b"0", 0, 0).await.unwrap();
1050        let start = std::time::Instant::now();
1051        let mut last = 0;
1052        for _ in 0..1000 {
1053            last = client.incr("bench_incr", 1, 0, 0).await.unwrap().unwrap();
1054        }
1055        let elapsed = start.elapsed();
1056        println!("bench_incr_1000: {:?}, last value: {}", elapsed, last);
1057    }
1058
1059    #[tokio::test]
1060    async fn bench_decr_1000() {
1061        let client = MemcacheClient::connect("127.0.0.1:11211").await;
1062        client.set("bench_decr", b"1000", 0, 0).await.unwrap();
1063        let start = std::time::Instant::now();
1064        let mut last = 0;
1065        for _ in 0..1000 {
1066            last = client.decr("bench_decr", 1, 0, 0).await.unwrap().unwrap();
1067        }
1068        let elapsed = start.elapsed();
1069        println!("bench_decr_1000: {:?}, last value: {}", elapsed, last);
1070    }
1071
1072    #[tokio::test]
1073    async fn bench_noop_1000() {
1074        let client = MemcacheClient::connect("127.0.0.1:11211").await;
1075        let start = std::time::Instant::now();
1076        for _ in 0..1000 {
1077            client.noop().await.unwrap();
1078        }
1079        let elapsed = start.elapsed();
1080        println!("bench_noop_1000: {:?}", elapsed);
1081    }
1082}