memcached/client/
mod.rs

1// Copyright (c) 2015 Y. T. Chung <zonyitoo@gmail.com>
2// Licensed under the Apache License, Version 2.0
3// <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
6// at your option. All files in the project carrying such
7// notice may not be copied, modified, or distributed except
8// according to those terms.
9
10//! Memcached client
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, HashMap};
14use std::io;
15use std::net::TcpStream;
16use std::ops::Deref;
17use std::path::Path;
18use std::rc::Rc;
19
20use conhash::{ConsistentHash, Node};
21
22use bufstream::BufStream;
23
24#[cfg(unix)]
25use unix_socket::UnixStream;
26
27use crate::proto::{self, AuthResponse, MemCachedResult};
28use crate::proto::{CasOperation, MultiOperation, NoReplyOperation, Operation, Proto};
29
30struct Sasl<'a> {
31    username: &'a str,
32    password: &'a str,
33}
34
35struct Server {
36    pub proto: Box<dyn Proto + Send>,
37    addr: String,
38}
39
40impl Server {
41    fn connect(
42        addr: String,
43        protocol: proto::ProtoType,
44        o_sasl: &Option<Sasl>,
45    ) -> io::Result<Server> {
46        let proto = {
47            let mut split = addr.split("://");
48            match protocol {
49                proto::ProtoType::Binary => match (split.next(), split.next()) {
50                    (Some("tcp"), Some(addr)) => {
51                        let stream = TcpStream::connect(addr)?;
52                        stream.set_nodelay(true)?;
53                        let mut proto = Box::new(proto::BinaryProto::new(BufStream::new(stream)))
54                            as Box<dyn Proto + Send>;
55                        if let Some(sasl) = o_sasl {
56                            let auth_str = format!("\x00{}\x00{}", sasl.username, sasl.password);
57                            match proto.auth_start("PLAIN", auth_str.as_bytes()) {
58                                Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)),
59                                Ok(AuthResponse::Succeeded) => (),
60                                Ok(resp) => {
61                                    let msg =
62                                        format!("SASL auth failed with AuthResponse: {:?}", resp);
63                                    return Err(io::Error::new(io::ErrorKind::Other, msg));
64                                }
65                            }
66                        }
67                        proto
68                    }
69                    #[cfg(unix)]
70                    (Some("unix"), Some(addr)) => {
71                        let stream = UnixStream::connect(&Path::new(addr))?;
72                        Box::new(proto::BinaryProto::new(BufStream::new(stream)))
73                            as Box<dyn Proto + Send>
74                    }
75                    (Some(prot), _) => {
76                        panic!("Unsupported protocol: {}", prot);
77                    }
78                    _ => panic!("Malformed address"),
79                },
80            }
81        };
82        Ok(Server { proto, addr })
83    }
84}
85
86#[derive(Clone)]
87struct ServerRef(Rc<RefCell<Server>>);
88
89impl Node for ServerRef {
90    fn name(&self) -> String {
91        self.0.borrow().addr.clone()
92    }
93}
94
95impl Deref for ServerRef {
96    type Target = Rc<RefCell<Server>>;
97
98    fn deref(&self) -> &Rc<RefCell<Server>> {
99        &self.0
100    }
101}
102
103// impl Clone for Server {
104//     fn clone(&self) -> Server {
105//         Server { proto: self.proto.clone() }
106//     }
107// }
108
109/// Memcached client
110///
111/// ```ignore
112/// use memcached::client::{Client};
113/// use memcached::proto::{CasOperation, MultiOperation, NoReplyOperation, Operation, ProtoType};
114///
115/// let mut client = Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
116///
117/// client.set(b"Foo", b"Bar", 0xdeadbeef, 2).unwrap();
118/// let (value, flags) = client.get(b"Foo").unwrap();
119/// assert_eq!(&value[..], b"Bar");
120/// assert_eq!(flags, 0xdeadbeef);
121///
122/// client.set_noreply(b"key:dontreply", b"1", 0x00000001, 20).unwrap();
123///
124/// let (_, cas_val) = client.increment_cas(b"key:numerical", 10, 1, 20, 0).unwrap();
125/// client.increment_cas(b"key:numerical", 1, 1, 20, cas_val).unwrap();
126/// ```
127pub struct Client {
128    servers: ConsistentHash<ServerRef>,
129}
130
131impl Client {
132    /// Connect to Memcached servers
133    ///
134    /// This function accept multiple servers, servers information should be represented
135    /// as a array of tuples in this form
136    ///
137    /// `(address, weight)`.
138    pub fn connect<S: ToString>(svrs: &[(S, usize)], p: proto::ProtoType) -> io::Result<Client> {
139        Client::conn(svrs, p, None)
140    }
141
142    /// Connect to Memcached servers that require SASL authentication
143    ///
144    /// This function accept multiple servers, servers information should be represented
145    /// as a array of tuples in this form
146    ///
147    /// `(address, weight)`.
148    pub fn connect_sasl<S: ToString>(
149        svrs: &[(S, usize)],
150        p: proto::ProtoType,
151        username: &str,
152        password: &str,
153    ) -> io::Result<Client> {
154        Client::conn(svrs, p, Some(Sasl { username, password }))
155    }
156
157    fn conn<S: ToString>(
158        svrs: &[(S, usize)],
159        p: proto::ProtoType,
160        sasl: Option<Sasl>,
161    ) -> io::Result<Client> {
162        assert!(!svrs.is_empty(), "Server list should not be empty");
163
164        let mut servers = ConsistentHash::new();
165        for (addr, weight) in svrs.iter() {
166            let svr = Server::connect(addr.to_string(), p, &sasl)?;
167            servers.add(&ServerRef(Rc::new(RefCell::new(svr))), *weight);
168        }
169
170        Ok(Client { servers })
171    }
172
173    fn find_server_by_key<'a>(&'a mut self, key: &[u8]) -> &'a mut ServerRef {
174        self.servers.get_mut(key).expect("No valid server found")
175    }
176}
177
178impl Operation for Client {
179    fn set(
180        &mut self,
181        key: &[u8],
182        value: &[u8],
183        flags: u32,
184        expiration: u32,
185    ) -> MemCachedResult<()> {
186        let server = self.find_server_by_key(key);
187        server.borrow_mut().proto.set(key, value, flags, expiration)
188    }
189
190    fn add(
191        &mut self,
192        key: &[u8],
193        value: &[u8],
194        flags: u32,
195        expiration: u32,
196    ) -> MemCachedResult<()> {
197        let server = self.find_server_by_key(key);
198        server.borrow_mut().proto.add(key, value, flags, expiration)
199    }
200
201    fn delete(&mut self, key: &[u8]) -> MemCachedResult<()> {
202        let server = self.find_server_by_key(key);
203        server.borrow_mut().proto.delete(key)
204    }
205
206    fn replace(
207        &mut self,
208        key: &[u8],
209        value: &[u8],
210        flags: u32,
211        expiration: u32,
212    ) -> MemCachedResult<()> {
213        let server = self.find_server_by_key(key);
214        server
215            .borrow_mut()
216            .proto
217            .replace(key, value, flags, expiration)
218    }
219
220    fn get(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, u32)> {
221        let server = self.find_server_by_key(key);
222        server.borrow_mut().proto.get(key)
223    }
224
225    fn getk(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, Vec<u8>, u32)> {
226        let server = self.find_server_by_key(key);
227        server.borrow_mut().proto.getk(key)
228    }
229
230    fn increment(
231        &mut self,
232        key: &[u8],
233        amount: u64,
234        initial: u64,
235        expiration: u32,
236    ) -> MemCachedResult<u64> {
237        let server = self.find_server_by_key(key);
238        server
239            .borrow_mut()
240            .proto
241            .increment(key, amount, initial, expiration)
242    }
243
244    fn decrement(
245        &mut self,
246        key: &[u8],
247        amount: u64,
248        initial: u64,
249        expiration: u32,
250    ) -> MemCachedResult<u64> {
251        let server = self.find_server_by_key(key);
252        server
253            .borrow_mut()
254            .proto
255            .increment(key, amount, initial, expiration)
256    }
257
258    fn append(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
259        let server = self.find_server_by_key(key);
260        server.borrow_mut().proto.append(key, value)
261    }
262
263    fn prepend(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
264        let server = self.find_server_by_key(key);
265        server.borrow_mut().proto.prepend(key, value)
266    }
267
268    fn touch(&mut self, key: &[u8], expiration: u32) -> MemCachedResult<()> {
269        let server = self.find_server_by_key(key);
270        server.borrow_mut().proto.touch(key, expiration)
271    }
272}
273
274impl NoReplyOperation for Client {
275    fn set_noreply(
276        &mut self,
277        key: &[u8],
278        value: &[u8],
279        flags: u32,
280        expiration: u32,
281    ) -> MemCachedResult<()> {
282        let server = self.find_server_by_key(key);
283        server
284            .borrow_mut()
285            .proto
286            .set_noreply(key, value, flags, expiration)
287    }
288
289    fn add_noreply(
290        &mut self,
291        key: &[u8],
292        value: &[u8],
293        flags: u32,
294        expiration: u32,
295    ) -> MemCachedResult<()> {
296        let server = self.find_server_by_key(key);
297        server
298            .borrow_mut()
299            .proto
300            .add_noreply(key, value, flags, expiration)
301    }
302
303    fn delete_noreply(&mut self, key: &[u8]) -> MemCachedResult<()> {
304        let server = self.find_server_by_key(key);
305        server.borrow_mut().proto.delete_noreply(key)
306    }
307
308    fn replace_noreply(
309        &mut self,
310        key: &[u8],
311        value: &[u8],
312        flags: u32,
313        expiration: u32,
314    ) -> MemCachedResult<()> {
315        let server = self.find_server_by_key(key);
316        server
317            .borrow_mut()
318            .proto
319            .replace_noreply(key, value, flags, expiration)
320    }
321
322    fn increment_noreply(
323        &mut self,
324        key: &[u8],
325        amount: u64,
326        initial: u64,
327        expiration: u32,
328    ) -> MemCachedResult<()> {
329        let server = self.find_server_by_key(key);
330        server
331            .borrow_mut()
332            .proto
333            .increment_noreply(key, amount, initial, expiration)
334    }
335
336    fn decrement_noreply(
337        &mut self,
338        key: &[u8],
339        amount: u64,
340        initial: u64,
341        expiration: u32,
342    ) -> MemCachedResult<()> {
343        let server = self.find_server_by_key(key);
344        server
345            .borrow_mut()
346            .proto
347            .decrement_noreply(key, amount, initial, expiration)
348    }
349
350    fn append_noreply(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
351        let server = self.find_server_by_key(key);
352        server.borrow_mut().proto.append_noreply(key, value)
353    }
354
355    fn prepend_noreply(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
356        let server = self.find_server_by_key(key);
357        server.borrow_mut().proto.prepend_noreply(key, value)
358    }
359}
360
361impl CasOperation for Client {
362    fn set_cas(
363        &mut self,
364        key: &[u8],
365        value: &[u8],
366        flags: u32,
367        expiration: u32,
368        cas: u64,
369    ) -> MemCachedResult<u64> {
370        let server = self.find_server_by_key(key);
371        server
372            .borrow_mut()
373            .proto
374            .set_cas(key, value, flags, expiration, cas)
375    }
376
377    fn add_cas(
378        &mut self,
379        key: &[u8],
380        value: &[u8],
381        flags: u32,
382        expiration: u32,
383    ) -> MemCachedResult<u64> {
384        let server = self.find_server_by_key(key);
385        server
386            .borrow_mut()
387            .proto
388            .add_cas(key, value, flags, expiration)
389    }
390
391    fn replace_cas(
392        &mut self,
393        key: &[u8],
394        value: &[u8],
395        flags: u32,
396        expiration: u32,
397        cas: u64,
398    ) -> MemCachedResult<u64> {
399        let server = self.find_server_by_key(key);
400        server
401            .borrow_mut()
402            .proto
403            .replace_cas(key, value, flags, expiration, cas)
404    }
405
406    fn get_cas(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, u32, u64)> {
407        let server = self.find_server_by_key(key);
408        server.borrow_mut().proto.get_cas(key)
409    }
410
411    fn getk_cas(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, Vec<u8>, u32, u64)> {
412        let server = self.find_server_by_key(key);
413        server.borrow_mut().proto.getk_cas(key)
414    }
415
416    fn increment_cas(
417        &mut self,
418        key: &[u8],
419        amount: u64,
420        initial: u64,
421        expiration: u32,
422        cas: u64,
423    ) -> MemCachedResult<(u64, u64)> {
424        let server = self.find_server_by_key(key);
425        server
426            .borrow_mut()
427            .proto
428            .increment_cas(key, amount, initial, expiration, cas)
429    }
430
431    fn decrement_cas(
432        &mut self,
433        key: &[u8],
434        amount: u64,
435        initial: u64,
436        expiration: u32,
437        cas: u64,
438    ) -> MemCachedResult<(u64, u64)> {
439        let server = self.find_server_by_key(key);
440        server
441            .borrow_mut()
442            .proto
443            .decrement_cas(key, amount, initial, expiration, cas)
444    }
445
446    fn append_cas(&mut self, key: &[u8], value: &[u8], cas: u64) -> MemCachedResult<u64> {
447        let server = self.find_server_by_key(key);
448        server.borrow_mut().proto.append_cas(key, value, cas)
449    }
450
451    fn prepend_cas(&mut self, key: &[u8], value: &[u8], cas: u64) -> MemCachedResult<u64> {
452        let server = self.find_server_by_key(key);
453        server.borrow_mut().proto.prepend_cas(key, value, cas)
454    }
455
456    fn touch_cas(&mut self, key: &[u8], expiration: u32, cas: u64) -> MemCachedResult<u64> {
457        let server = self.find_server_by_key(key);
458        server.borrow_mut().proto.touch_cas(key, expiration, cas)
459    }
460}
461
462impl MultiOperation for Client {
463    fn set_multi(&mut self, kv: BTreeMap<&[u8], (&[u8], u32, u32)>) -> MemCachedResult<()> {
464        assert_eq!(self.servers.len(), 1);
465        let server = self.find_server_by_key(kv.keys().next().unwrap());
466        server.borrow_mut().proto.set_multi(kv)
467    }
468    fn delete_multi(&mut self, keys: &[&[u8]]) -> MemCachedResult<()> {
469        assert_eq!(self.servers.len(), 1);
470        let server = self.find_server_by_key(keys[0]);
471        server.borrow_mut().proto.delete_multi(keys)
472    }
473    fn increment_multi<'a>(
474        &mut self,
475        kv: HashMap<&'a [u8], (u64, u64, u32)>,
476    ) -> MemCachedResult<HashMap<&'a [u8], u64>> {
477        assert_eq!(self.servers.len(), 1);
478        let server = self.find_server_by_key(kv.keys().next().unwrap());
479        server.borrow_mut().proto.increment_multi(kv)
480    }
481    fn get_multi(&mut self, keys: &[&[u8]]) -> MemCachedResult<HashMap<Vec<u8>, (Vec<u8>, u32)>> {
482        assert_eq!(self.servers.len(), 1);
483        let server = self.find_server_by_key(keys[0]);
484        server.borrow_mut().proto.get_multi(keys)
485    }
486}
487
488#[cfg(all(test, feature = "nightly"))]
489mod test {
490    use super::Client;
491    use crate::proto::{NoReplyOperation, Operation, ProtoType};
492    use rand::random;
493    use test::Bencher;
494
495    fn generate_data(len: usize) -> Vec<u8> {
496        (0..len).map(|_| random()).collect()
497    }
498
499    #[bench]
500    fn bench_set_64(b: &mut Bencher) {
501        let key = b"test:test_bench";
502        let val = generate_data(64);
503
504        let mut client =
505            Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
506
507        b.iter(|| client.set(key, &val[..], 0, 2));
508    }
509
510    #[bench]
511    fn bench_set_noreply_64(b: &mut Bencher) {
512        let key = b"test:test_bench";
513        let val = generate_data(64);
514
515        let mut client =
516            Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
517
518        b.iter(|| client.set_noreply(key, &val[..], 0, 2));
519    }
520
521    #[bench]
522    fn bench_set_512(b: &mut Bencher) {
523        let key = b"test:test_bench";
524        let val = generate_data(512);
525
526        let mut client =
527            Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
528
529        b.iter(|| client.set(key, &val[..], 0, 2));
530    }
531
532    #[bench]
533    fn bench_set_noreply_512(b: &mut Bencher) {
534        let key = b"test:test_bench";
535        let val = generate_data(512);
536
537        let mut client =
538            Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
539
540        b.iter(|| client.set_noreply(key, &val[..], 0, 2));
541    }
542
543    #[bench]
544    fn bench_set_1024(b: &mut Bencher) {
545        let key = b"test:test_bench";
546        let val = generate_data(1024);
547
548        let mut client =
549            Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
550
551        b.iter(|| client.set(key, &val[..], 0, 2));
552    }
553
554    #[bench]
555    fn bench_set_noreply_1024(b: &mut Bencher) {
556        let key = b"test:test_bench";
557        let val = generate_data(1024);
558
559        let mut client =
560            Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
561
562        b.iter(|| client.set_noreply(key, &val[..], 0, 2));
563    }
564
565    #[bench]
566    fn bench_set_4096(b: &mut Bencher) {
567        let key = b"test:test_bench";
568        let val = generate_data(4096);
569
570        let mut client =
571            Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
572
573        b.iter(|| client.set(key, &val[..], 0, 2));
574    }
575
576    #[bench]
577    fn bench_set_noreply_4096(b: &mut Bencher) {
578        let key = b"test:test_bench";
579        let val = generate_data(4096);
580
581        let mut client =
582            Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
583
584        b.iter(|| client.set_noreply(key, &val[..], 0, 2));
585    }
586
587    #[bench]
588    fn bench_set_16384(b: &mut Bencher) {
589        let key = b"test:test_bench";
590        let val = generate_data(16384);
591
592        let mut client =
593            Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
594
595        b.iter(|| client.set(key, &val[..], 0, 2));
596    }
597
598    #[bench]
599    fn bench_set_noreply_16384(b: &mut Bencher) {
600        let key = b"test:test_bench";
601        let val = generate_data(16384);
602
603        let mut client =
604            Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
605
606        b.iter(|| client.set_noreply(key, &val[..], 0, 2));
607    }
608}